From eda1507039fdc59176e0323cfce5da3e81b29875 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Mon, 9 Mar 2026 21:16:48 +0800 Subject: [PATCH 1/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.36=E3=80=91worker=5Fprocess=20=E5=8D=95=E6=B5=8B=E8=A1=A5?= =?UTF-8?q?=E5=85=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/worker/test_worker_process.py | 528 ++++++++++++++++++++++++---- 1 file changed, 460 insertions(+), 68 deletions(-) diff --git a/tests/worker/test_worker_process.py b/tests/worker/test_worker_process.py index 19430fafce2..f92653042a9 100644 --- a/tests/worker/test_worker_process.py +++ b/tests/worker/test_worker_process.py @@ -13,96 +13,488 @@ # limitations under the License. import logging -import unittest +import sys +from unittest.mock import AsyncMock, MagicMock, patch +import numpy as np +import pytest -class TestInterceptPaddleLoggers(unittest.TestCase): - """Test cases for intercept_paddle_loggers context manager from tools.logger_patch""" +WP = "fastdeploy.worker.worker_process" - def test_intercept_paddle_loggers_with_paddle_prefix(self): - """Test intercept_paddle_loggers configures paddle loggers correctly (line 28-30)""" - from fastdeploy.logger.logger import intercept_paddle_loggers - # Create a logger with existing handlers before interception - test_logger_name = "paddle.test.logger" - test_logger = logging.getLogger(test_logger_name) +def _cfg(**overrides): + """Build a minimal FDConfig-like MagicMock.""" + c = MagicMock() + c.parallel_config.local_engine_worker_queue_port = 9999 + c.parallel_config.tensor_parallel_size = 1 + c.parallel_config.tensor_parallel_rank = 0 + c.parallel_config.data_parallel_size = 1 + c.parallel_config.data_parallel_rank = 0 + c.parallel_config.local_data_parallel_id = 0 + c.parallel_config.engine_pid = 12345 + c.parallel_config.pod_ip = "127.0.0.1" + c.parallel_config.use_ep = False + c.parallel_config.expert_parallel_size = 1 + c.parallel_config.tp_group = MagicMock() + c.parallel_config.shutdown_comm_group_if_worker_idle = False + c.cache_config.num_cpu_blocks = 0 + c.cache_config.total_block_num = 100 + c.scheduler_config.enable_overlap_schedule = False + c.scheduler_config.splitwise_role = "mixed" + c.speculative_config.method = None + c.eplb_config.enable_eplb = False + c.load_config.dynamic_load_weight = False + c.model_config.enable_mm = False + c.model_config.enable_logprob = False + c.model_config.architectures = ["LlamaForCausalLM"] + c.nnode = 1 + for k, v in overrides.items(): + parts = k.split(".") + obj = c + for p in parts[:-1]: + obj = getattr(obj, p) + setattr(obj, parts[-1], v) + return c - # Add some handlers to the logger - handler1 = logging.StreamHandler() - handler2 = logging.StreamHandler() - test_logger.addHandler(handler1) - test_logger.addHandler(handler2) - self.assertEqual(len(test_logger.handlers), 2) - # Use the context manager to intercept paddle loggers - with intercept_paddle_loggers(): - # Get logger inside context - should be configured by interceptor - intercepted_logger = logging.getLogger(test_logger_name) +@pytest.fixture +def proc_factory(): + """Yields a factory that creates PaddleDisWorkerProc with heavy deps mocked.""" + with patch(f"{WP}.current_platform") as plat, patch(f"{WP}.get_worker") as gw: + plat.is_iluvatar.return_value = False + plat.is_cuda.return_value = False + plat.is_xpu.return_value = False + + def make(ranks=1, local_rank=0, **cfg_kw): + from fastdeploy.worker.worker_process import PaddleDisWorkerProc + + return PaddleDisWorkerProc(_cfg(**cfg_kw), ranks=ranks, local_rank=local_rank) - # Verify the logger was reconfigured by the interceptor - self.assertEqual(len(intercepted_logger.handlers), 1) - self.assertIsInstance(intercepted_logger.handlers[0], logging.StreamHandler) - self.assertEqual(intercepted_logger.level, logging.INFO) - self.assertFalse(intercepted_logger.propagate) + make.plat = plat + make.gw = gw + yield make - # Clean up - test_logger.handlers = [] - def test_intercept_paddle_loggers_restores_original(self): - """Test intercept_paddle_loggers restores original getLogger after exit (line 46)""" +# ── Tests from develop baseline (intercept_paddle_loggers) ────────────────── +class TestInterceptPaddleLoggers: + def test_paddle_prefix_configured(self): from fastdeploy.logger.logger import intercept_paddle_loggers - # Store original getLogger before context - original_getLogger = logging.getLogger + lg = logging.getLogger("paddle.test.logger") + lg.addHandler(logging.StreamHandler()) + lg.addHandler(logging.StreamHandler()) + with intercept_paddle_loggers(): + ilg = logging.getLogger("paddle.test.logger") + assert len(ilg.handlers) == 1 + assert ilg.level == logging.INFO + lg.handlers = [] + + def test_restores_and_exception_safe(self): + from fastdeploy.logger.logger import intercept_paddle_loggers - # Use the context manager + orig = logging.getLogger with intercept_paddle_loggers(): - # Inside context, getLogger should be patched - self.assertNotEqual(logging.getLogger, original_getLogger) + assert logging.getLogger != orig + assert logging.getLogger is orig + # also safe on exception + try: + with intercept_paddle_loggers(): + raise ValueError + except ValueError: + pass + assert logging.getLogger is orig - # After exit, getLogger should be restored - self.assertEqual(logging.getLogger, original_getLogger) - def test_intercept_paddle_loggers_non_paddle_logger_unchanged(self): - """Test non-paddle loggers are not affected by intercept_paddle_loggers""" - from fastdeploy.logger.logger import intercept_paddle_loggers +# ── Module-level functions ────────────────────────────────────────────────── +class TestModuleFunctions: + """Tests for get_worker, update_fd_config_for_mm, parse_args, + init_distributed_environment, initialize_fd_config.""" - # Create a non-paddle logger - test_logger_name = "other.test.logger" - test_logger = logging.getLogger(test_logger_name) + # -- get_worker platform dispatch -- + @pytest.mark.parametrize( + "platform,module_path,class_name", + [ + ("is_dcu", "fastdeploy.worker.dcu_worker", "DcuWorker"), + ("is_cuda", "fastdeploy.worker.gpu_worker", "GpuWorker"), + ("is_xpu", "fastdeploy.worker.xpu_worker", "XpuWorker"), + ("is_iluvatar", "fastdeploy.worker.iluvatar_worker", "IluvatarWorker"), + ("is_gcu", "fastdeploy.worker.gcu_worker", "GcuWorker"), + ("is_maca", "fastdeploy.worker.metax_worker", "MetaxWorker"), + ("is_intel_hpu", "fastdeploy.worker.hpu_worker", "HpuWorker"), + ], + ) + def test_get_worker_dispatch(self, platform, module_path, class_name): + from fastdeploy.worker.worker_process import get_worker - # Add a handler - original_handler = logging.StreamHandler() - test_logger.addHandler(original_handler) - original_handler_count = len(test_logger.handlers) + with patch(f"{WP}.current_platform") as plat: + for a in ("is_dcu", "is_cuda", "is_xpu", "is_iluvatar", "is_gcu", "is_maca", "is_intel_hpu"): + getattr(plat, a).return_value = False + getattr(plat, platform).return_value = True + mock_mod = MagicMock() + sentinel = MagicMock() + setattr(mock_mod, class_name, MagicMock(return_value=sentinel)) + with patch.dict("sys.modules", {module_path: mock_mod}): + assert get_worker(_cfg(), local_rank=0, rank=1) is sentinel - # Use the context manager - with intercept_paddle_loggers(): - # Get the same logger - result_logger = logging.getLogger(test_logger_name) - # Non-paddle loggers should not be modified - self.assertEqual(len(result_logger.handlers), original_handler_count) - self.assertEqual(result_logger.handlers[0], original_handler) + def test_get_worker_logprob_unsupported_raises(self): + from fastdeploy.worker.worker_process import get_worker - # Clean up - test_logger.handlers = [] + with patch(f"{WP}.current_platform") as plat: + for a in ("is_dcu", "is_cuda", "is_xpu", "is_iluvatar", "is_gcu", "is_maca", "is_intel_hpu"): + getattr(plat, a).return_value = False + with pytest.raises(NotImplementedError): + get_worker(_cfg(**{"model_config.enable_logprob": True}), 0, 1) - def test_intercept_paddle_loggers_exception_safety(self): - """Test intercept_paddle_loggers restores getLogger even if exception occurs""" - from fastdeploy.logger.logger import intercept_paddle_loggers + # -- update_fd_config_for_mm -- + def test_update_mm_ernie_sets_fields(self): + from fastdeploy.config import ErnieArchitectures + from fastdeploy.worker.worker_process import update_fd_config_for_mm - original_getLogger = logging.getLogger + fd = _cfg( + **{ + "model_config.enable_mm": True, + "model_config.architectures": ["Ernie4_5ForCausalLM"], + "parallel_config.tensor_parallel_size": 4, + "parallel_config.tensor_parallel_rank": 2, + "model_config.dtype": "float16", + } + ) + with patch.object(ErnieArchitectures, "contains_ernie_arch", return_value=True): + update_fd_config_for_mm(fd) + assert fd.model_config.tensor_model_parallel_size == 4 + assert fd.model_config.vision_config.dtype == "float16" - try: - with intercept_paddle_loggers(): - # Raise an exception inside context - raise ValueError("Test exception") - except ValueError: - pass # Expected + def test_update_mm_non_ernie_and_disabled_skip(self): + from fastdeploy.config import ErnieArchitectures + from fastdeploy.worker.worker_process import update_fd_config_for_mm + + fd = _cfg(**{"model_config.enable_mm": True}) + orig = fd.model_config.tensor_model_parallel_size + with patch.object(ErnieArchitectures, "contains_ernie_arch", return_value=False): + update_fd_config_for_mm(fd) + assert fd.model_config.tensor_model_parallel_size is orig + + fd2 = _cfg(**{"model_config.enable_mm": False}) + orig2 = fd2.model_config.tensor_model_parallel_size + update_fd_config_for_mm(fd2) + assert fd2.model_config.tensor_model_parallel_size is orig2 + + # -- parse_args -- + def test_parse_args_defaults_and_custom(self): + from fastdeploy.worker.worker_process import parse_args + + with patch.object(sys, "argv", ["prog"]): + a = parse_args() + assert a.model == "./output" and a.dtype == "bfloat16" and a.tensor_parallel_size == 1 + + argv = ["prog", "-m", "/tmp/m", "--dtype", "float16", "--do_profile", "--tensor_parallel_size", "4"] + with patch.object(sys, "argv", argv): + a = parse_args() + assert a.model == "/tmp/m" and a.do_profile and a.tensor_parallel_size == 4 + + def test_parse_args_json_configs(self): + from fastdeploy.worker.worker_process import parse_args + + argv = [ + "prog", + "--speculative_config", + '{"method":"eagle"}', + "--quantization", + '{"quant_type":"wint4"}', + "--eplb_config", + '{"enable_eplb":true}', + ] + with patch.object(sys, "argv", argv): + a = parse_args() + assert a.speculative_config["method"] == "eagle" + assert a.eplb_config["enable_eplb"] + + # -- init_distributed_environment -- + def test_init_dist_multi_and_single_rank(self): + from fastdeploy.worker.worker_process import init_distributed_environment + + with patch(f"{WP}.dist") as dist, patch(f"{WP}.fleet") as fleet: + dist.get_world_size.return_value = 2 + fleet.worker_index.return_value = 1 + r, lr = init_distributed_environment(seed=42) + assert (r, lr) == (2, 1) + fleet.init.assert_called_once() + + with patch(f"{WP}.dist") as dist, patch(f"{WP}.fleet"): + dist.get_world_size.return_value = 0 + r, lr = init_distributed_environment() + assert (r, lr) == (0, 0) + + # -- initialize_fd_config -- + def test_initialize_fd_config_creates_config(self): + from fastdeploy.worker.worker_process import parse_args + + with patch.object(sys, "argv", ["prog", "-m", "/tmp/m", "--dtype", "float16"]): + args = parse_args() + with ( + patch(f"{WP}.v1_loader_support", return_value=True), + patch(f"{WP}.parse_quant_config", return_value=None), + patch(f"{WP}.update_fd_config_for_mm") as upd, + patch(f"{WP}.current_platform") as plat, + patch(f"{WP}.paddle"), + patch(f"{WP}.ModelConfig") as mc, + patch(f"{WP}.DeviceConfig"), + patch(f"{WP}.SpeculativeConfig"), + patch(f"{WP}.ParallelConfig") as pc, + patch(f"{WP}.CacheConfig"), + patch(f"{WP}.SchedulerConfig"), + patch(f"{WP}.EPLBConfig"), + patch(f"{WP}.LoadConfig") as lc, + patch(f"{WP}.GraphOptimizationConfig"), + patch(f"{WP}.PlasAttentionConfig"), + patch(f"{WP}.EarlyStopConfig"), + patch(f"{WP}.StructuredOutputsConfig"), + patch(f"{WP}.RoutingReplayConfig"), + patch(f"{WP}.FDConfig") as fd, + ): + for a in ("is_cuda", "is_xpu", "is_maca", "is_iluvatar", "is_intel_hpu"): + getattr(plat, a).return_value = a == "is_cuda" + mc.return_value.num_hidden_layers = 2 + mc.return_value.architectures = ["LlamaForCausalLM"] + mc.return_value.is_quantized = False + mc.return_value.quantization_config = None + mc.return_value.head_dim = 128 + mc.return_value.pretrained_config = MagicMock() + pc.return_value.tensor_parallel_size = 1 + pc.return_value.data_parallel_size = 1 + pc.return_value.expert_parallel_size = 1 + pc.return_value.use_ep = False + lc.return_value.dynamic_load_weight = False + lc.return_value.load_strategy = "ipc_snapshot" + lc.return_value.rsync_config = None + lc.return_value.load_choices = "default_v1" + from fastdeploy.worker.worker_process import initialize_fd_config + + initialize_fd_config(args, ranks=1, local_rank=0) + fd.assert_called_once() + upd.assert_called_once() + + +# ── PaddleDisWorkerProc lifecycle ─────────────────────────────────────────── +class TestPaddleDisWorkerProc: + """End-to-end tests for the worker process class: init → services → model + lifecycle → kv cache → eplb → control methods → barriers.""" + + # -- constructor -- + def test_init_stores_attrs_and_chips(self, proc_factory): + p = proc_factory(ranks=2, local_rank=1) + assert p.ranks == 2 and p.local_rank == 1 and p.max_chips_per_node == 8 + proc_factory.gw.assert_called_once() + + proc_factory.plat.is_iluvatar.return_value = True + proc_factory.gw.reset_mock() + p2 = proc_factory() + assert p2.max_chips_per_node == 16 + + def test_init_speculative_and_overlap(self, proc_factory): + proc_factory.plat.is_cuda.return_value = True + p = proc_factory( + **{ + "speculative_config.method": "eagle", + "scheduler_config.enable_overlap_schedule": True, + } + ) + assert p.speculative_decoding and not p.enable_overlap_schedule + + proc_factory.gw.reset_mock() + p2 = proc_factory(**{"scheduler_config.enable_overlap_schedule": True, "speculative_config.method": None}) + assert p2.enable_overlap_schedule + + # -- init_control + health_status + task_queue -- + def test_init_control_creates_fmq_queue(self, proc_factory): + with patch(f"{WP}.FMQ") as fmq: + p = proc_factory(**{"parallel_config.local_engine_worker_queue_port": 5555}) + p.local_rank = 3 + p.init_control() + fmq.return_value.queue.assert_called_once_with("ctrl_w2e_rank3_5555", "producer") + + def test_init_health_status_single_and_multi_dp(self, proc_factory): + with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.time") as t: + t.time.return_value = 1000.0 + proc_factory(**{"parallel_config.data_parallel_size": 1}).init_health_status() + single_count = ipc.call_count + + with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.time") as t, patch(f"{WP}.envs") as env: + t.time.return_value = 1000.0 + env.FD_ENABLE_MULTI_API_SERVER = False + sig = MagicMock() + sig.value.__getitem__ = MagicMock(return_value=1) + ipc.return_value = sig + proc_factory( + **{ + "parallel_config.data_parallel_size": 2, + "parallel_config.local_data_parallel_id": 0, + } + ).init_health_status() + assert ipc.call_count > single_count + + def test_task_queue_shm_and_tcp(self, proc_factory): + with patch(f"{WP}.envs") as env, patch(f"{WP}.TaskQueue") as tq: + env.FD_ENGINE_TASK_QUEUE_WITH_SHM = True + proc_factory(**{"parallel_config.local_engine_worker_queue_port": 7777}).start_task_queue_service() + assert "fd_task_queue_7777" in tq.call_args[1]["address"] + + with patch(f"{WP}.envs") as env, patch(f"{WP}.TaskQueue") as tq: + env.FD_ENGINE_TASK_QUEUE_WITH_SHM = False + proc_factory( + **{ + "parallel_config.pod_ip": "10.0.0.1", + "parallel_config.local_engine_worker_queue_port": 8888, + } + ).start_task_queue_service() + assert tq.call_args[1]["address"] == ("10.0.0.1", 8888) + + # -- model lifecycle -- + def test_load_model_and_init_device(self, proc_factory): + with patch(f"{WP}.IPCSignal") as ipc: + sig = MagicMock() + sig.value = np.zeros([1], dtype=np.int32) + ipc.return_value = sig + p = proc_factory() + p.load_model() + p.worker.load_model.assert_called_once() + assert p.loaded_model_signal.value[0] == 1 + + with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.paddle") as pdl: + sig = MagicMock() + sig.value = np.zeros([1], dtype=np.int32) + ipc.return_value = sig + proc_factory(ranks=2).load_model() + pdl.distributed.barrier.assert_called_once() + + proc_factory().init_device() + + def test_graph_optimize_and_splitwise(self, proc_factory): + with patch(f"{WP}.envs") as env: + env.ENABLE_V1_KVCACHE_SCHEDULER = True + p = proc_factory() + p.graph_optimize_and_warm_up_model() + p.worker.graph_optimize_and_warm_up_model.assert_called_once() + + with patch(f"{WP}.envs") as env, patch(f"{WP}.IPCSignal"): + env.ENABLE_V1_KVCACHE_SCHEDULER = False + p = proc_factory(**{"scheduler_config.splitwise_role": "prefill"}) + p.worker.model_runner.device_id = 0 + p.graph_optimize_and_warm_up_model() + + # -- kv cache -- + def test_kv_cache_no_profile(self, proc_factory): + p = proc_factory(**{"parallel_config.do_profile": False, "cache_config.total_block_num": 42}) + p.initialize_kv_cache() + p.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=42) + + def test_kv_cache_profile_normal_and_cap(self, proc_factory): + with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): + p = proc_factory(**{"parallel_config.do_profile": True}) + p.worker.determine_available_memory.return_value = 1024**3 + p.worker.cal_theortical_kvcache.return_value = 1024**2 + p.initialize_kv_cache() + p.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=1024) + + with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): + proc_factory.gw.reset_mock() + p = proc_factory(**{"parallel_config.do_profile": True}) + p.worker.determine_available_memory.return_value = 100 * 1024**3 + p.worker.cal_theortical_kvcache.return_value = 1024 + p.initialize_kv_cache() + p.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=40000) + + def test_kv_cache_zero_blocks_raises(self, proc_factory): + with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): + p = proc_factory(**{"parallel_config.do_profile": True}) + p.worker.determine_available_memory.return_value = 0 + p.worker.cal_theortical_kvcache.return_value = 1024 + with pytest.raises(ValueError): + p.initialize_kv_cache() + + def test_kv_cache_multi_rank_all_reduces(self, proc_factory): + with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist") as dist, patch(f"{WP}.paddle") as pdl: + mock_t = MagicMock() + mock_t.item.return_value = 500 + pdl.full.return_value = mock_t + p = proc_factory(ranks=2, **{"parallel_config.do_profile": True}) + p.worker.determine_available_memory.return_value = 1024**3 + p.worker.cal_theortical_kvcache.return_value = 1024**2 + p.initialize_kv_cache() + dist.all_reduce.assert_called_once() + + # -- run_control_method -- + def test_control_method_success_and_errors(self, proc_factory): + p = proc_factory() + p._ctrl_output = MagicMock() + p._ctrl_output.put = AsyncMock() + + # unknown → 400 + p.worker.bad = None + req = MagicMock() + req.request_id, req.method, req.args = "r1", "bad", {} + p.run_control_method(req) + + # success → 200 + p.worker.do_it = MagicMock(return_value={"ok": True}) + req.method, req.args = "do_it", {"x": 1} + p.run_control_method(req) + p.worker.do_it.assert_called_once_with(x=1) + + # exception → 500 + p.worker.fail = MagicMock(side_effect=RuntimeError("boom")) + req.method, req.args = "fail", {} + p.run_control_method(req) + p.worker.fail.assert_called_once() + + # -- eplb -- + def test_eplb_disabled_and_enabled(self, proc_factory): + p = proc_factory(**{"eplb_config.enable_eplb": False}) + p._init_eplb_signal() + assert not hasattr(p, "experts_manager") + p._run_eplb(tp_rank=0) + + with patch(f"{WP}.RedundantExpertManager") as rem, patch(f"{WP}.IPCSignal"), patch(f"{WP}.create_mmap"): + p2 = proc_factory( + **{ + "eplb_config.enable_eplb": True, + "model_config.num_hidden_layers": 4, + "model_config.moe_num_experts": 8, + } + ) + p2._init_eplb_signal() + rem.assert_called_once() + + # -- barrier / broadcast / update_weights -- + def test_tp_barrier_default_and_xpu(self, proc_factory): + with patch(f"{WP}.paddle") as pdl: + p = proc_factory() + p.enable_overlap_schedule = False + p._tp_barrier_wait() + pdl.distributed.barrier.assert_called_once() - # After exception, getLogger should still be restored - self.assertEqual(logging.getLogger, original_getLogger) + proc_factory.plat.is_xpu.return_value = True + p2 = proc_factory() + p2.task_queue = MagicMock() + p2._tp_barrier_wait() + p2.task_queue.worker_process_tp_barrier.wait.assert_called_once() + def test_broadcast_model_weights_signal(self, proc_factory): + with patch(f"{WP}.paddle") as pdl: + p = proc_factory(ranks=2) + p.model_weights_signal = np.array([42], dtype=np.int32) + mock_t = MagicMock() + mock_t.numpy.return_value = np.array([42]) + pdl.full.return_value = mock_t + assert p._broadcast_model_weights_signal(src=0, group=None) == 42 -if __name__ == "__main__": - unittest.main() + def test_update_weights_from_tensor(self, proc_factory): + with patch(f"{WP}.load_tensor_from_shm_mem") as load, patch(f"{WP}.MODEL_MAIN_NAME", "main"): + p = proc_factory() + p.experts_manager = MagicMock() + p.experts_manager.tensor_infos = {"x": 1} + p.experts_manager.get_ep_rank_to_expert_id_list.return_value = ([1], {0: 1}, 1) + load.return_value = {"w": MagicMock()} + p.update_weights_from_tensor({"main": "data"}) + load.assert_called_once() + assert p.experts_manager.tensor_infos is None From 97a218504417990f4f601d3c9ca03565a7f8309f Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Tue, 10 Mar 2026 01:57:30 +0800 Subject: [PATCH 2/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.36=E3=80=91rewrite=20worker=5Fprocess=20test=20=E2=80=94=20lea?= =?UTF-8?q?n=20flat=20structure,=2080%=20coverage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Flatten class-based tests to flat pytest functions (32→25 tests) - Add _run_eplb enabled path (token stats + weight update + value=None) - Add initialize_fd_config EP branch + quant_config branch - Add kv_cache cap at 40000, graph_optimize splitwise path - Add run_worker_proc with deterministic mode + iluvatar dispatch - Coverage: 69% → 80% (395/493 stmts) --- tests/worker/test_worker_process.py | 904 +++++++++++++++------------- 1 file changed, 472 insertions(+), 432 deletions(-) diff --git a/tests/worker/test_worker_process.py b/tests/worker/test_worker_process.py index f92653042a9..10ade3bb427 100644 --- a/tests/worker/test_worker_process.py +++ b/tests/worker/test_worker_process.py @@ -14,6 +14,7 @@ import logging import sys +from contextlib import ExitStack from unittest.mock import AsyncMock, MagicMock, patch import numpy as np @@ -22,8 +23,11 @@ WP = "fastdeploy.worker.worker_process" +# -- helpers ------------------------------------------------------------------- + + def _cfg(**overrides): - """Build a minimal FDConfig-like MagicMock.""" + """Minimal FDConfig-like MagicMock.""" c = MagicMock() c.parallel_config.local_engine_worker_queue_port = 9999 c.parallel_config.tensor_parallel_size = 1 @@ -58,443 +62,479 @@ def _cfg(**overrides): @pytest.fixture -def proc_factory(): - """Yields a factory that creates PaddleDisWorkerProc with heavy deps mocked.""" +def pw(): + """Patches current_platform and get_worker at module level.""" with patch(f"{WP}.current_platform") as plat, patch(f"{WP}.get_worker") as gw: plat.is_iluvatar.return_value = False plat.is_cuda.return_value = False plat.is_xpu.return_value = False + yield plat, gw + + +def _make(pw, ranks=1, local_rank=0, **cfg_kw): + """Create PaddleDisWorkerProc with heavy deps mocked.""" + from fastdeploy.worker.worker_process import PaddleDisWorkerProc + + return PaddleDisWorkerProc(_cfg(**cfg_kw), ranks=ranks, local_rank=local_rank) + + +# -- intercept_paddle_loggers ------------------------------------------------- + + +def test_intercept_paddle_loggers(): + from fastdeploy.logger.logger import intercept_paddle_loggers + + lg = logging.getLogger("paddle.test.logger") + lg.addHandler(logging.StreamHandler()) + lg.addHandler(logging.StreamHandler()) + with intercept_paddle_loggers(): + assert len(logging.getLogger("paddle.test.logger").handlers) == 1 + lg.handlers = [] + orig = logging.getLogger + with intercept_paddle_loggers(): + assert logging.getLogger != orig + assert logging.getLogger is orig + + +# -- get_worker ---------------------------------------------------------------- - def make(ranks=1, local_rank=0, **cfg_kw): - from fastdeploy.worker.worker_process import PaddleDisWorkerProc - - return PaddleDisWorkerProc(_cfg(**cfg_kw), ranks=ranks, local_rank=local_rank) - - make.plat = plat - make.gw = gw - yield make - - -# ── Tests from develop baseline (intercept_paddle_loggers) ────────────────── -class TestInterceptPaddleLoggers: - def test_paddle_prefix_configured(self): - from fastdeploy.logger.logger import intercept_paddle_loggers - - lg = logging.getLogger("paddle.test.logger") - lg.addHandler(logging.StreamHandler()) - lg.addHandler(logging.StreamHandler()) - with intercept_paddle_loggers(): - ilg = logging.getLogger("paddle.test.logger") - assert len(ilg.handlers) == 1 - assert ilg.level == logging.INFO - lg.handlers = [] - - def test_restores_and_exception_safe(self): - from fastdeploy.logger.logger import intercept_paddle_loggers - - orig = logging.getLogger - with intercept_paddle_loggers(): - assert logging.getLogger != orig - assert logging.getLogger is orig - # also safe on exception - try: - with intercept_paddle_loggers(): - raise ValueError - except ValueError: - pass - assert logging.getLogger is orig - - -# ── Module-level functions ────────────────────────────────────────────────── -class TestModuleFunctions: - """Tests for get_worker, update_fd_config_for_mm, parse_args, - init_distributed_environment, initialize_fd_config.""" - - # -- get_worker platform dispatch -- - @pytest.mark.parametrize( - "platform,module_path,class_name", - [ - ("is_dcu", "fastdeploy.worker.dcu_worker", "DcuWorker"), - ("is_cuda", "fastdeploy.worker.gpu_worker", "GpuWorker"), - ("is_xpu", "fastdeploy.worker.xpu_worker", "XpuWorker"), - ("is_iluvatar", "fastdeploy.worker.iluvatar_worker", "IluvatarWorker"), - ("is_gcu", "fastdeploy.worker.gcu_worker", "GcuWorker"), - ("is_maca", "fastdeploy.worker.metax_worker", "MetaxWorker"), - ("is_intel_hpu", "fastdeploy.worker.hpu_worker", "HpuWorker"), - ], + +@pytest.mark.parametrize( + "platform,module_path,class_name", + [ + ("is_dcu", "fastdeploy.worker.dcu_worker", "DcuWorker"), + ("is_cuda", "fastdeploy.worker.gpu_worker", "GpuWorker"), + ("is_xpu", "fastdeploy.worker.xpu_worker", "XpuWorker"), + ("is_iluvatar", "fastdeploy.worker.iluvatar_worker", "IluvatarWorker"), + ("is_gcu", "fastdeploy.worker.gcu_worker", "GcuWorker"), + ("is_maca", "fastdeploy.worker.metax_worker", "MetaxWorker"), + ("is_intel_hpu", "fastdeploy.worker.hpu_worker", "HpuWorker"), + ], +) +def test_get_worker(platform, module_path, class_name): + from fastdeploy.worker.worker_process import get_worker + + with patch(f"{WP}.current_platform") as plat: + for a in ("is_dcu", "is_cuda", "is_xpu", "is_iluvatar", "is_gcu", "is_maca", "is_intel_hpu"): + getattr(plat, a).return_value = False + getattr(plat, platform).return_value = True + mock_mod = MagicMock() + sentinel = MagicMock() + setattr(mock_mod, class_name, MagicMock(return_value=sentinel)) + with patch.dict("sys.modules", {module_path: mock_mod}): + assert get_worker(_cfg(), local_rank=0, rank=1) is sentinel + + +# -- Utility functions --------------------------------------------------------- + + +def test_update_mm(): + from fastdeploy.config import ErnieArchitectures + from fastdeploy.worker.worker_process import update_fd_config_for_mm + + fd = _cfg( + **{ + "model_config.enable_mm": True, + "model_config.architectures": ["Ernie4_5ForCausalLM"], + "parallel_config.tensor_parallel_size": 4, + "parallel_config.tensor_parallel_rank": 2, + "model_config.dtype": "float16", + } + ) + with patch.object(ErnieArchitectures, "contains_ernie_arch", return_value=True): + update_fd_config_for_mm(fd) + assert fd.model_config.tensor_model_parallel_size == 4 + fd2 = _cfg(**{"model_config.enable_mm": False}) + orig = fd2.model_config.tensor_model_parallel_size + update_fd_config_for_mm(fd2) + assert fd2.model_config.tensor_model_parallel_size is orig + + +def test_parse_args(): + from fastdeploy.worker.worker_process import parse_args + + with patch.object(sys, "argv", ["prog"]): + a = parse_args() + assert a.model == "./output" and a.dtype == "bfloat16" + argv = [ + "prog", + "-m", + "/tmp/m", + "--dtype", + "float16", + "--tensor_parallel_size", + "4", + "--speculative_config", + '{"method":"eagle"}', + "--eplb_config", + '{"enable_eplb":true}', + ] + with patch.object(sys, "argv", argv): + a = parse_args() + assert a.model == "/tmp/m" and a.tensor_parallel_size == 4 + assert a.speculative_config["method"] == "eagle" and a.eplb_config["enable_eplb"] + + +def test_init_distributed(): + from fastdeploy.worker.worker_process import init_distributed_environment + + with patch(f"{WP}.dist") as dist, patch(f"{WP}.fleet") as fleet: + dist.get_world_size.return_value = 2 + fleet.worker_index.return_value = 1 + assert init_distributed_environment(seed=42) == (2, 1) + fleet.init.assert_called_once() + with patch(f"{WP}.dist") as dist, patch(f"{WP}.fleet"): + dist.get_world_size.return_value = 0 + assert init_distributed_environment() == (0, 0) + + +# -- initialize_fd_config ----------------------------------------------------- + + +_FD_PATCH_NAMES = [ + "v1_loader_support", + "parse_quant_config", + "update_fd_config_for_mm", + "current_platform", + "paddle", + "ModelConfig", + "DeviceConfig", + "SpeculativeConfig", + "ParallelConfig", + "CacheConfig", + "SchedulerConfig", + "EPLBConfig", + "LoadConfig", + "GraphOptimizationConfig", + "PlasAttentionConfig", + "EarlyStopConfig", + "StructuredOutputsConfig", + "RoutingReplayConfig", + "FDConfig", +] + + +def _fd_config_env(): + """Return (args, ExitStack, mocks_dict) for initialize_fd_config tests.""" + from fastdeploy.worker.worker_process import parse_args + + with patch.object(sys, "argv", ["prog", "-m", "/tmp/m", "--dtype", "float16"]): + args = parse_args() + stack = ExitStack() + m = {n: stack.enter_context(patch(f"{WP}.{n}")) for n in _FD_PATCH_NAMES} + for a in ("is_cuda", "is_xpu", "is_maca", "is_iluvatar", "is_intel_hpu"): + getattr(m["current_platform"], a).return_value = a == "is_cuda" + mc = m["ModelConfig"].return_value + mc.num_hidden_layers = 2 + mc.architectures = ["LlamaForCausalLM"] + mc.is_quantized = False + mc.quantization_config = None + mc.head_dim = 128 + mc.pretrained_config = MagicMock() + pc = m["ParallelConfig"].return_value + pc.tensor_parallel_size = 1 + pc.data_parallel_size = 1 + pc.expert_parallel_size = 1 + pc.use_ep = False + lc = m["LoadConfig"].return_value + lc.dynamic_load_weight = False + lc.load_strategy = "ipc_snapshot" + lc.rsync_config = None + lc.load_choices = "default_v1" + m["parse_quant_config"].return_value = None + return args, stack, m + + +def test_initialize_fd_config(): + """Basic path + EP/quant branches.""" + from fastdeploy.worker.worker_process import initialize_fd_config + + args, stack, m = _fd_config_env() + with stack: + initialize_fd_config(args, ranks=1, local_rank=0) + m["FDConfig"].assert_called_once() + m["update_fd_config_for_mm"].assert_called_once() + # EP + quant path + args2, stack2, m2 = _fd_config_env() + with stack2: + m2["ParallelConfig"].return_value.data_parallel_size = 2 + m2["ParallelConfig"].return_value.expert_parallel_size = 2 + m2["ModelConfig"].return_value.moe_num_experts = [8] + m2["EPLBConfig"].return_value.redundant_experts_num = 0 + m2["parse_quant_config"].return_value = MagicMock() + m2["ModelConfig"].return_value.is_quantized = True + initialize_fd_config(args2, ranks=2, local_rank=0) + m2["FDConfig"].assert_called_once() + + +# -- PaddleDisWorkerProc ------------------------------------------------------- + + +def test_proc_init_and_control(pw): + """Constructor fields + init_control.""" + plat, gw = pw + p = _make(pw, ranks=2, local_rank=1) + assert p.ranks == 2 and p.local_rank == 1 and p.max_chips_per_node == 8 + gw.assert_called_once() + # Iluvatar → 16 chips + plat.is_iluvatar.return_value = True + gw.reset_mock() + assert _make(pw).max_chips_per_node == 16 + plat.is_iluvatar.return_value = False + # Speculative + overlap + plat.is_cuda.return_value = True + gw.reset_mock() + p3 = _make( + pw, + **{ + "speculative_config.method": "eagle", + "scheduler_config.enable_overlap_schedule": True, + }, ) - def test_get_worker_dispatch(self, platform, module_path, class_name): - from fastdeploy.worker.worker_process import get_worker - - with patch(f"{WP}.current_platform") as plat: - for a in ("is_dcu", "is_cuda", "is_xpu", "is_iluvatar", "is_gcu", "is_maca", "is_intel_hpu"): - getattr(plat, a).return_value = False - getattr(plat, platform).return_value = True - mock_mod = MagicMock() - sentinel = MagicMock() - setattr(mock_mod, class_name, MagicMock(return_value=sentinel)) - with patch.dict("sys.modules", {module_path: mock_mod}): - assert get_worker(_cfg(), local_rank=0, rank=1) is sentinel - - def test_get_worker_logprob_unsupported_raises(self): - from fastdeploy.worker.worker_process import get_worker - - with patch(f"{WP}.current_platform") as plat: - for a in ("is_dcu", "is_cuda", "is_xpu", "is_iluvatar", "is_gcu", "is_maca", "is_intel_hpu"): - getattr(plat, a).return_value = False - with pytest.raises(NotImplementedError): - get_worker(_cfg(**{"model_config.enable_logprob": True}), 0, 1) - - # -- update_fd_config_for_mm -- - def test_update_mm_ernie_sets_fields(self): - from fastdeploy.config import ErnieArchitectures - from fastdeploy.worker.worker_process import update_fd_config_for_mm - - fd = _cfg( + assert p3.speculative_decoding and not p3.enable_overlap_schedule + plat.is_cuda.return_value = False + # init_control + with patch(f"{WP}.FMQ") as fmq: + p4 = _make(pw, **{"parallel_config.local_engine_worker_queue_port": 5555}) + p4.local_rank = 3 + p4.init_control() + fmq.return_value.queue.assert_called_once_with("ctrl_w2e_rank3_5555", "producer") + + +def test_health_and_task_queue(pw): + """init_health_status + start_task_queue_service.""" + with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.time") as t: + t.time.return_value = 1000.0 + _make(pw, **{"parallel_config.data_parallel_size": 1}).init_health_status() + assert ipc.call_count > 0 + # SHM task queue + with patch(f"{WP}.envs") as env, patch(f"{WP}.TaskQueue") as tq: + env.FD_ENGINE_TASK_QUEUE_WITH_SHM = True + _make(pw, **{"parallel_config.local_engine_worker_queue_port": 7777}).start_task_queue_service() + assert "fd_task_queue_7777" in tq.call_args[1]["address"] + # TCP task queue + with patch(f"{WP}.envs") as env, patch(f"{WP}.TaskQueue") as tq: + env.FD_ENGINE_TASK_QUEUE_WITH_SHM = False + _make( + pw, **{ - "model_config.enable_mm": True, - "model_config.architectures": ["Ernie4_5ForCausalLM"], - "parallel_config.tensor_parallel_size": 4, - "parallel_config.tensor_parallel_rank": 2, - "model_config.dtype": "float16", - } - ) - with patch.object(ErnieArchitectures, "contains_ernie_arch", return_value=True): - update_fd_config_for_mm(fd) - assert fd.model_config.tensor_model_parallel_size == 4 - assert fd.model_config.vision_config.dtype == "float16" - - def test_update_mm_non_ernie_and_disabled_skip(self): - from fastdeploy.config import ErnieArchitectures - from fastdeploy.worker.worker_process import update_fd_config_for_mm - - fd = _cfg(**{"model_config.enable_mm": True}) - orig = fd.model_config.tensor_model_parallel_size - with patch.object(ErnieArchitectures, "contains_ernie_arch", return_value=False): - update_fd_config_for_mm(fd) - assert fd.model_config.tensor_model_parallel_size is orig - - fd2 = _cfg(**{"model_config.enable_mm": False}) - orig2 = fd2.model_config.tensor_model_parallel_size - update_fd_config_for_mm(fd2) - assert fd2.model_config.tensor_model_parallel_size is orig2 - - # -- parse_args -- - def test_parse_args_defaults_and_custom(self): - from fastdeploy.worker.worker_process import parse_args - - with patch.object(sys, "argv", ["prog"]): - a = parse_args() - assert a.model == "./output" and a.dtype == "bfloat16" and a.tensor_parallel_size == 1 - - argv = ["prog", "-m", "/tmp/m", "--dtype", "float16", "--do_profile", "--tensor_parallel_size", "4"] - with patch.object(sys, "argv", argv): - a = parse_args() - assert a.model == "/tmp/m" and a.do_profile and a.tensor_parallel_size == 4 - - def test_parse_args_json_configs(self): - from fastdeploy.worker.worker_process import parse_args - - argv = [ - "prog", - "--speculative_config", - '{"method":"eagle"}', - "--quantization", - '{"quant_type":"wint4"}', - "--eplb_config", - '{"enable_eplb":true}', - ] - with patch.object(sys, "argv", argv): - a = parse_args() - assert a.speculative_config["method"] == "eagle" - assert a.eplb_config["enable_eplb"] - - # -- init_distributed_environment -- - def test_init_dist_multi_and_single_rank(self): - from fastdeploy.worker.worker_process import init_distributed_environment - - with patch(f"{WP}.dist") as dist, patch(f"{WP}.fleet") as fleet: - dist.get_world_size.return_value = 2 - fleet.worker_index.return_value = 1 - r, lr = init_distributed_environment(seed=42) - assert (r, lr) == (2, 1) - fleet.init.assert_called_once() - - with patch(f"{WP}.dist") as dist, patch(f"{WP}.fleet"): - dist.get_world_size.return_value = 0 - r, lr = init_distributed_environment() - assert (r, lr) == (0, 0) - - # -- initialize_fd_config -- - def test_initialize_fd_config_creates_config(self): - from fastdeploy.worker.worker_process import parse_args - - with patch.object(sys, "argv", ["prog", "-m", "/tmp/m", "--dtype", "float16"]): - args = parse_args() - with ( - patch(f"{WP}.v1_loader_support", return_value=True), - patch(f"{WP}.parse_quant_config", return_value=None), - patch(f"{WP}.update_fd_config_for_mm") as upd, - patch(f"{WP}.current_platform") as plat, - patch(f"{WP}.paddle"), - patch(f"{WP}.ModelConfig") as mc, - patch(f"{WP}.DeviceConfig"), - patch(f"{WP}.SpeculativeConfig"), - patch(f"{WP}.ParallelConfig") as pc, - patch(f"{WP}.CacheConfig"), - patch(f"{WP}.SchedulerConfig"), - patch(f"{WP}.EPLBConfig"), - patch(f"{WP}.LoadConfig") as lc, - patch(f"{WP}.GraphOptimizationConfig"), - patch(f"{WP}.PlasAttentionConfig"), - patch(f"{WP}.EarlyStopConfig"), - patch(f"{WP}.StructuredOutputsConfig"), - patch(f"{WP}.RoutingReplayConfig"), - patch(f"{WP}.FDConfig") as fd, - ): - for a in ("is_cuda", "is_xpu", "is_maca", "is_iluvatar", "is_intel_hpu"): - getattr(plat, a).return_value = a == "is_cuda" - mc.return_value.num_hidden_layers = 2 - mc.return_value.architectures = ["LlamaForCausalLM"] - mc.return_value.is_quantized = False - mc.return_value.quantization_config = None - mc.return_value.head_dim = 128 - mc.return_value.pretrained_config = MagicMock() - pc.return_value.tensor_parallel_size = 1 - pc.return_value.data_parallel_size = 1 - pc.return_value.expert_parallel_size = 1 - pc.return_value.use_ep = False - lc.return_value.dynamic_load_weight = False - lc.return_value.load_strategy = "ipc_snapshot" - lc.return_value.rsync_config = None - lc.return_value.load_choices = "default_v1" - from fastdeploy.worker.worker_process import initialize_fd_config - - initialize_fd_config(args, ranks=1, local_rank=0) - fd.assert_called_once() - upd.assert_called_once() - - -# ── PaddleDisWorkerProc lifecycle ─────────────────────────────────────────── -class TestPaddleDisWorkerProc: - """End-to-end tests for the worker process class: init → services → model - lifecycle → kv cache → eplb → control methods → barriers.""" - - # -- constructor -- - def test_init_stores_attrs_and_chips(self, proc_factory): - p = proc_factory(ranks=2, local_rank=1) - assert p.ranks == 2 and p.local_rank == 1 and p.max_chips_per_node == 8 - proc_factory.gw.assert_called_once() - - proc_factory.plat.is_iluvatar.return_value = True - proc_factory.gw.reset_mock() - p2 = proc_factory() - assert p2.max_chips_per_node == 16 - - def test_init_speculative_and_overlap(self, proc_factory): - proc_factory.plat.is_cuda.return_value = True - p = proc_factory( + "parallel_config.pod_ip": "10.0.0.1", + "parallel_config.local_engine_worker_queue_port": 8888, + }, + ).start_task_queue_service() + assert tq.call_args[1]["address"] == ("10.0.0.1", 8888) + + +def test_load_model_and_graph(pw): + """load_model, init_device, graph_optimize_and_warm_up_model.""" + with patch(f"{WP}.IPCSignal") as ipc: + sig = MagicMock() + sig.value = np.zeros([1], dtype=np.int32) + ipc.return_value = sig + p = _make(pw) + p.load_model() + p.worker.load_model.assert_called_once() + assert p.loaded_model_signal.value[0] == 1 + # Distributed load + with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.paddle") as pdl: + sig = MagicMock() + sig.value = np.zeros([1], dtype=np.int32) + ipc.return_value = sig + _make(pw, ranks=2).load_model() + pdl.distributed.barrier.assert_called_once() + # init_device + graph optimize + _make(pw).init_device() + with patch(f"{WP}.envs") as env: + env.ENABLE_V1_KVCACHE_SCHEDULER = True + p2 = _make(pw) + p2.graph_optimize_and_warm_up_model() + p2.worker.graph_optimize_and_warm_up_model.assert_called_once() + # Splitwise prefill path + with patch(f"{WP}.envs") as env, patch(f"{WP}.IPCSignal"): + env.ENABLE_V1_KVCACHE_SCHEDULER = False + p3 = _make(pw, **{"scheduler_config.splitwise_role": "prefill"}) + p3.worker.model_runner.device_id = 0 + p3.graph_optimize_and_warm_up_model() + + +def test_kv_cache(pw): + """initialize_kv_cache: no-profile, profile, zero-memory, cap at 40000.""" + # No profile + p = _make(pw, **{"parallel_config.do_profile": False, "cache_config.total_block_num": 42}) + p.initialize_kv_cache() + p.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=42) + # Profile: normal (reset worker mock to avoid cross-contamination) + _, gw = pw + gw.return_value = MagicMock() + with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): + p2 = _make(pw, **{"parallel_config.do_profile": True}) + p2.worker.determine_available_memory.return_value = 1024**3 + p2.worker.cal_theortical_kvcache.return_value = 1024**2 + p2.initialize_kv_cache() + p2.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=1024) + # Zero memory → ValueError + with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): + p3 = _make(pw, **{"parallel_config.do_profile": True}) + p3.worker.determine_available_memory.return_value = 0 + p3.worker.cal_theortical_kvcache.return_value = 1024 + with pytest.raises(ValueError): + p3.initialize_kv_cache() + # Capped at MAX_BLOCK_NUM (40000) + with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): + p4 = _make(pw, **{"parallel_config.do_profile": True}) + p4.worker.initialize_cache.reset_mock() + p4.worker.determine_available_memory.return_value = 100 * 1024**3 + p4.worker.cal_theortical_kvcache.return_value = 1024 + p4.initialize_kv_cache() + p4.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=40000) + + +def test_control_method(pw): + """run_control_method: bad method, success, exception.""" + p = _make(pw) + p._ctrl_output = MagicMock() + p._ctrl_output.put = AsyncMock() + req = MagicMock() + req.request_id, req.method, req.args = "r1", "bad", {} + p.worker.bad = None + p.run_control_method(req) + # success + p.worker.do_it = MagicMock(return_value={"ok": True}) + req.method, req.args = "do_it", {"x": 1} + p.run_control_method(req) + p.worker.do_it.assert_called_once_with(x=1) + # exception + p.worker.fail = MagicMock(side_effect=RuntimeError("boom")) + req.method, req.args = "fail", {} + p.run_control_method(req) + + +def test_eplb(pw): + """_init_eplb_signal + _run_eplb.""" + # Disabled + p = _make(pw, **{"eplb_config.enable_eplb": False}) + p._init_eplb_signal() + assert not hasattr(p, "experts_manager") + p._run_eplb(tp_rank=0) + # Enabled init + with patch(f"{WP}.RedundantExpertManager") as rem, patch(f"{WP}.IPCSignal"), patch(f"{WP}.create_mmap"): + p2 = _make( + pw, **{ - "speculative_config.method": "eagle", - "scheduler_config.enable_overlap_schedule": True, - } + "eplb_config.enable_eplb": True, + "model_config.num_hidden_layers": 4, + "model_config.moe_num_experts": 8, + }, ) - assert p.speculative_decoding and not p.enable_overlap_schedule - - proc_factory.gw.reset_mock() - p2 = proc_factory(**{"scheduler_config.enable_overlap_schedule": True, "speculative_config.method": None}) - assert p2.enable_overlap_schedule - - # -- init_control + health_status + task_queue -- - def test_init_control_creates_fmq_queue(self, proc_factory): - with patch(f"{WP}.FMQ") as fmq: - p = proc_factory(**{"parallel_config.local_engine_worker_queue_port": 5555}) - p.local_rank = 3 - p.init_control() - fmq.return_value.queue.assert_called_once_with("ctrl_w2e_rank3_5555", "producer") - - def test_init_health_status_single_and_multi_dp(self, proc_factory): - with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.time") as t: - t.time.return_value = 1000.0 - proc_factory(**{"parallel_config.data_parallel_size": 1}).init_health_status() - single_count = ipc.call_count - - with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.time") as t, patch(f"{WP}.envs") as env: - t.time.return_value = 1000.0 - env.FD_ENABLE_MULTI_API_SERVER = False - sig = MagicMock() - sig.value.__getitem__ = MagicMock(return_value=1) - ipc.return_value = sig - proc_factory( - **{ - "parallel_config.data_parallel_size": 2, - "parallel_config.local_data_parallel_id": 0, - } - ).init_health_status() - assert ipc.call_count > single_count - - def test_task_queue_shm_and_tcp(self, proc_factory): - with patch(f"{WP}.envs") as env, patch(f"{WP}.TaskQueue") as tq: - env.FD_ENGINE_TASK_QUEUE_WITH_SHM = True - proc_factory(**{"parallel_config.local_engine_worker_queue_port": 7777}).start_task_queue_service() - assert "fd_task_queue_7777" in tq.call_args[1]["address"] - - with patch(f"{WP}.envs") as env, patch(f"{WP}.TaskQueue") as tq: - env.FD_ENGINE_TASK_QUEUE_WITH_SHM = False - proc_factory( - **{ - "parallel_config.pod_ip": "10.0.0.1", - "parallel_config.local_engine_worker_queue_port": 8888, - } - ).start_task_queue_service() - assert tq.call_args[1]["address"] == ("10.0.0.1", 8888) - - # -- model lifecycle -- - def test_load_model_and_init_device(self, proc_factory): - with patch(f"{WP}.IPCSignal") as ipc: - sig = MagicMock() - sig.value = np.zeros([1], dtype=np.int32) - ipc.return_value = sig - p = proc_factory() - p.load_model() - p.worker.load_model.assert_called_once() - assert p.loaded_model_signal.value[0] == 1 - - with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.paddle") as pdl: - sig = MagicMock() - sig.value = np.zeros([1], dtype=np.int32) - ipc.return_value = sig - proc_factory(ranks=2).load_model() - pdl.distributed.barrier.assert_called_once() - - proc_factory().init_device() - - def test_graph_optimize_and_splitwise(self, proc_factory): - with patch(f"{WP}.envs") as env: - env.ENABLE_V1_KVCACHE_SCHEDULER = True - p = proc_factory() - p.graph_optimize_and_warm_up_model() - p.worker.graph_optimize_and_warm_up_model.assert_called_once() - - with patch(f"{WP}.envs") as env, patch(f"{WP}.IPCSignal"): - env.ENABLE_V1_KVCACHE_SCHEDULER = False - p = proc_factory(**{"scheduler_config.splitwise_role": "prefill"}) - p.worker.model_runner.device_id = 0 - p.graph_optimize_and_warm_up_model() - - # -- kv cache -- - def test_kv_cache_no_profile(self, proc_factory): - p = proc_factory(**{"parallel_config.do_profile": False, "cache_config.total_block_num": 42}) - p.initialize_kv_cache() - p.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=42) - - def test_kv_cache_profile_normal_and_cap(self, proc_factory): - with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): - p = proc_factory(**{"parallel_config.do_profile": True}) - p.worker.determine_available_memory.return_value = 1024**3 - p.worker.cal_theortical_kvcache.return_value = 1024**2 - p.initialize_kv_cache() - p.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=1024) - - with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): - proc_factory.gw.reset_mock() - p = proc_factory(**{"parallel_config.do_profile": True}) - p.worker.determine_available_memory.return_value = 100 * 1024**3 - p.worker.cal_theortical_kvcache.return_value = 1024 - p.initialize_kv_cache() - p.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=40000) - - def test_kv_cache_zero_blocks_raises(self, proc_factory): - with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): - p = proc_factory(**{"parallel_config.do_profile": True}) - p.worker.determine_available_memory.return_value = 0 - p.worker.cal_theortical_kvcache.return_value = 1024 - with pytest.raises(ValueError): - p.initialize_kv_cache() - - def test_kv_cache_multi_rank_all_reduces(self, proc_factory): - with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist") as dist, patch(f"{WP}.paddle") as pdl: - mock_t = MagicMock() - mock_t.item.return_value = 500 - pdl.full.return_value = mock_t - p = proc_factory(ranks=2, **{"parallel_config.do_profile": True}) - p.worker.determine_available_memory.return_value = 1024**3 - p.worker.cal_theortical_kvcache.return_value = 1024**2 - p.initialize_kv_cache() - dist.all_reduce.assert_called_once() - - # -- run_control_method -- - def test_control_method_success_and_errors(self, proc_factory): - p = proc_factory() - p._ctrl_output = MagicMock() - p._ctrl_output.put = AsyncMock() - - # unknown → 400 - p.worker.bad = None - req = MagicMock() - req.request_id, req.method, req.args = "r1", "bad", {} - p.run_control_method(req) - - # success → 200 - p.worker.do_it = MagicMock(return_value={"ok": True}) - req.method, req.args = "do_it", {"x": 1} - p.run_control_method(req) - p.worker.do_it.assert_called_once_with(x=1) - - # exception → 500 - p.worker.fail = MagicMock(side_effect=RuntimeError("boom")) - req.method, req.args = "fail", {} - p.run_control_method(req) - p.worker.fail.assert_called_once() - - # -- eplb -- - def test_eplb_disabled_and_enabled(self, proc_factory): - p = proc_factory(**{"eplb_config.enable_eplb": False}) - p._init_eplb_signal() - assert not hasattr(p, "experts_manager") - p._run_eplb(tp_rank=0) - - with patch(f"{WP}.RedundantExpertManager") as rem, patch(f"{WP}.IPCSignal"), patch(f"{WP}.create_mmap"): - p2 = proc_factory( - **{ - "eplb_config.enable_eplb": True, - "model_config.num_hidden_layers": 4, - "model_config.moe_num_experts": 8, - } - ) - p2._init_eplb_signal() - rem.assert_called_once() - - # -- barrier / broadcast / update_weights -- - def test_tp_barrier_default_and_xpu(self, proc_factory): - with patch(f"{WP}.paddle") as pdl: - p = proc_factory() - p.enable_overlap_schedule = False - p._tp_barrier_wait() - pdl.distributed.barrier.assert_called_once() - - proc_factory.plat.is_xpu.return_value = True - p2 = proc_factory() - p2.task_queue = MagicMock() - p2._tp_barrier_wait() - p2.task_queue.worker_process_tp_barrier.wait.assert_called_once() - - def test_broadcast_model_weights_signal(self, proc_factory): - with patch(f"{WP}.paddle") as pdl: - p = proc_factory(ranks=2) - p.model_weights_signal = np.array([42], dtype=np.int32) - mock_t = MagicMock() - mock_t.numpy.return_value = np.array([42]) - pdl.full.return_value = mock_t - assert p._broadcast_model_weights_signal(src=0, group=None) == 42 - - def test_update_weights_from_tensor(self, proc_factory): - with patch(f"{WP}.load_tensor_from_shm_mem") as load, patch(f"{WP}.MODEL_MAIN_NAME", "main"): - p = proc_factory() - p.experts_manager = MagicMock() - p.experts_manager.tensor_infos = {"x": 1} - p.experts_manager.get_ep_rank_to_expert_id_list.return_value = ([1], {0: 1}, 1) - load.return_value = {"w": MagicMock()} - p.update_weights_from_tensor({"main": "data"}) - load.assert_called_once() - assert p.experts_manager.tensor_infos is None + p2._init_eplb_signal() + rem.assert_called_once() + # Enabled run with token stats + weight update path + p3 = _make( + pw, + **{ + "eplb_config.enable_eplb": True, + "eplb_config.redundant_expert_dump_workload_interval": 10, + }, + ) + p3.last_dump_expert_workload_ts = 0 + stats = np.ones((4, 8), dtype=np.int32) + p3.local_experts_token_stats_array = MagicMock() + p3.local_experts_token_stats_array.value = stats.copy() + p3.signal_clear_experts_token_stats = MagicMock() + p3.signal_clear_experts_token_stats.value = np.array([1], dtype=np.int32) + p3.signal_update_weight_from_tensor_array = MagicMock() + p3.signal_update_weight_from_tensor_array.value = np.array([1], dtype=np.int32) + p3.rearrange_experts_signal = MagicMock() + p3.rearrange_experts_signal.value = np.zeros([1], dtype=np.int32) + p3.mmap_infos = {"main": "data"} + p3.worker.get_model().redundant_table_manger.get_expert_tokens_stats.return_value = ( + stats, + None, + None, + None, + ) + with patch(f"{WP}.time") as t, patch(f"{WP}.paddle") as pdl: + t.time.return_value = 100.0 + mock_data = MagicMock() + mock_data.__getitem__ = MagicMock(return_value=147183647) + pdl.to_tensor.return_value = mock_data + with patch.object(p3, "update_weights_from_tensor"): + p3._run_eplb(tp_rank=0) + assert p3.signal_clear_experts_token_stats.value[0] == 0 + assert p3.signal_update_weight_from_tensor_array.value[0] == 0 + + +def test_barrier_broadcast_update(pw): + """_tp_barrier_wait + _broadcast_model_weights_signal + update_weights.""" + with patch(f"{WP}.paddle") as pdl: + p = _make(pw) + p.enable_overlap_schedule = False + p._tp_barrier_wait() + pdl.distributed.barrier.assert_called_once() + # XPU barrier path + plat, gw = pw + plat.is_xpu.return_value = True + p2 = _make(pw) + p2.task_queue = MagicMock() + p2._tp_barrier_wait() + p2.task_queue.worker_process_tp_barrier.wait.assert_called_once() + plat.is_xpu.return_value = False + # broadcast + with patch(f"{WP}.paddle") as pdl: + p3 = _make(pw, ranks=2) + p3.model_weights_signal = np.array([42], dtype=np.int32) + mock_t = MagicMock() + mock_t.numpy.return_value = np.array([42]) + pdl.full.return_value = mock_t + assert p3._broadcast_model_weights_signal(src=0, group=None) == 42 + # update_weights_from_tensor + with patch(f"{WP}.load_tensor_from_shm_mem") as load, patch(f"{WP}.MODEL_MAIN_NAME", "main"): + p4 = _make(pw) + p4.experts_manager = MagicMock() + p4.experts_manager.tensor_infos = {"x": 1} + p4.experts_manager.get_ep_rank_to_expert_id_list.return_value = ([1], {0: 1}, 1) + load.return_value = {"w": MagicMock()} + p4.update_weights_from_tensor({"main": "data"}) + load.assert_called_once() + assert p4.experts_manager.tensor_infos is None + + +# -- run_worker_proc ----------------------------------------------------------- + + +def test_run_worker_proc(): + from fastdeploy.worker.worker_process import run_worker_proc + + with ( + patch(f"{WP}.parse_args"), + patch(f"{WP}.init_distributed_environment", return_value=(1, 0)), + patch(f"{WP}.initialize_fd_config"), + patch(f"{WP}.current_platform") as plat, + patch(f"{WP}.PaddleDisWorkerProc") as cls, + patch(f"{WP}.envs") as env, + ): + plat.is_iluvatar.return_value = False + env.FD_DETERMINISTIC_MODE = False + wp = MagicMock() + cls.return_value = wp + run_worker_proc() + cls.assert_called_once() + wp.init_control.assert_called_once() + wp.event_loop_normal.assert_called_once() + # Deterministic mode + cls.reset_mock() + env.FD_DETERMINISTIC_MODE = True + wp2 = MagicMock() + cls.return_value = wp2 + mock_biao = MagicMock() + with patch.dict("sys.modules", {"fastdeploy.model_executor.layers.batch_invariant_ops": mock_biao}): + run_worker_proc() + mock_biao.init_deterministic_mode.assert_called_once() + # Iluvatar path + cls.reset_mock() + plat.is_iluvatar.return_value = True + env.FD_DETERMINISTIC_MODE = False + mock_il = MagicMock() + with patch.dict("sys.modules", {"fastdeploy.worker.iluvatar_worker": mock_il}): + run_worker_proc() + mock_il.IluvatarPaddleDisWorkerProc.assert_called_once() From 60228424d5fe8ebce33d75e901199e2682a53a34 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Tue, 10 Mar 2026 12:24:16 +0800 Subject: [PATCH 3/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.36=E3=80=91improve=20worker=5Fprocess=20coverage=20to=2080%?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add test cases for: - EP config with int moe_num_experts (else branch) - Non-CUDA/XPU platform V1 scheduler fallback - v1_loader_support fallback to default - PaddleOCR architecture branch - num_hidden_layers=None ValueError --- tests/worker/test_worker_process.py | 101 ++++++++++++++++++++++++---- 1 file changed, 87 insertions(+), 14 deletions(-) diff --git a/tests/worker/test_worker_process.py b/tests/worker/test_worker_process.py index 10ade3bb427..fc5c7b54510 100644 --- a/tests/worker/test_worker_process.py +++ b/tests/worker/test_worker_process.py @@ -99,6 +99,16 @@ def test_intercept_paddle_loggers(): # -- get_worker ---------------------------------------------------------------- +def test_get_worker_logprob_unsupported(): + from fastdeploy.worker.worker_process import get_worker + + with patch(f"{WP}.current_platform") as plat: + for a in ("is_dcu", "is_cuda", "is_xpu", "is_iluvatar", "is_gcu", "is_maca", "is_intel_hpu"): + getattr(plat, a).return_value = False + with pytest.raises(NotImplementedError): + get_worker(_cfg(**{"model_config.enable_logprob": True}), local_rank=0, rank=0) + + @pytest.mark.parametrize( "platform,module_path,class_name", [ @@ -254,7 +264,7 @@ def test_initialize_fd_config(): initialize_fd_config(args, ranks=1, local_rank=0) m["FDConfig"].assert_called_once() m["update_fd_config_for_mm"].assert_called_once() - # EP + quant path + # EP + quant path (list moe_num_experts) args2, stack2, m2 = _fd_config_env() with stack2: m2["ParallelConfig"].return_value.data_parallel_size = 2 @@ -265,6 +275,67 @@ def test_initialize_fd_config(): m2["ModelConfig"].return_value.is_quantized = True initialize_fd_config(args2, ranks=2, local_rank=0) m2["FDConfig"].assert_called_once() + # EP with int moe_num_experts + num_local_experts=None → else branch + args3, stack3, m3 = _fd_config_env() + with stack3: + m3["ParallelConfig"].return_value.expert_parallel_size = 2 + m3["ModelConfig"].return_value.moe_num_experts = 8 + m3["ModelConfig"].return_value.num_local_experts = None + m3["EPLBConfig"].return_value.redundant_experts_num = 0 + initialize_fd_config(args3, ranks=1, local_rank=0) + # All platforms False → ENABLE_V1_KVCACHE_SCHEDULER set to 0 + args4, stack4, m4 = _fd_config_env() + with stack4: + for a in ("is_cuda", "is_xpu", "is_maca", "is_iluvatar", "is_intel_hpu"): + getattr(m4["current_platform"], a).return_value = False + initialize_fd_config(args4, ranks=1, local_rank=0) + # v1_loader_support returns False → load_choices fallback + args5, stack5, m5 = _fd_config_env() + with stack5: + m5["v1_loader_support"].return_value = False + fd = m5["FDConfig"].return_value + fd.load_config.load_choices = "default_v1" + fd.model_config.architectures = ["LlamaForCausalLM"] + initialize_fd_config(args5, ranks=1, local_rank=0) + # PaddleOCR architecture + args6, stack6, m6 = _fd_config_env() + with stack6: + fd6 = m6["FDConfig"].return_value + fd6.model_config.architectures = ["PaddleOCRForCausalLM"] + fd6.load_config.load_choices = "default" + initialize_fd_config(args6, ranks=1, local_rank=0) + # EP with num_local_experts int (not list, not None) → elif branch (L1089) + args_ep, stack_ep, m_ep = _fd_config_env() + with stack_ep: + m_ep["ParallelConfig"].return_value.expert_parallel_size = 2 + m_ep["ModelConfig"].return_value.moe_num_experts = 8 + m_ep["ModelConfig"].return_value.num_local_experts = 4 + m_ep["EPLBConfig"].return_value.redundant_experts_num = 0 + initialize_fd_config(args_ep, ranks=1, local_rank=0) + # Quant config present but not pre-quantized → online quant log (L1138) + args_q, stack_q, m_q = _fd_config_env() + with stack_q: + m_q["parse_quant_config"].return_value = MagicMock() + m_q["ModelConfig"].return_value.is_quantized = False + initialize_fd_config(args_q, ranks=1, local_rank=0) + # Splitwise prefill with V1 scheduler → PREFILL_NODE_ONE_STEP_STOP_V1="1" (L1159) + args_sp, stack_sp, m_sp = _fd_config_env() + with stack_sp, patch(f"{WP}.envs") as env_sp, patch.dict("os.environ", {}, clear=False): + env_sp.ENABLE_V1_KVCACHE_SCHEDULER = True + args_sp.splitwise_role = "prefill" + initialize_fd_config(args_sp, ranks=1, local_rank=0) + # Splitwise decode → PREFILL_NODE_ONE_STEP_STOP_V1="0" (L1161) + args_sd, stack_sd, m_sd = _fd_config_env() + with stack_sd, patch(f"{WP}.envs") as env_sd, patch.dict("os.environ", {}, clear=False): + env_sd.ENABLE_V1_KVCACHE_SCHEDULER = True + args_sd.splitwise_role = "decode" + initialize_fd_config(args_sd, ranks=1, local_rank=0) + # num_hidden_layers is None → ValueError + args7, stack7, m7 = _fd_config_env() + with stack7: + m7["ModelConfig"].return_value.num_hidden_layers = None + with pytest.raises(ValueError): + initialize_fd_config(args7, ranks=1, local_rank=0) # -- PaddleDisWorkerProc ------------------------------------------------------- @@ -281,18 +352,6 @@ def test_proc_init_and_control(pw): gw.reset_mock() assert _make(pw).max_chips_per_node == 16 plat.is_iluvatar.return_value = False - # Speculative + overlap - plat.is_cuda.return_value = True - gw.reset_mock() - p3 = _make( - pw, - **{ - "speculative_config.method": "eagle", - "scheduler_config.enable_overlap_schedule": True, - }, - ) - assert p3.speculative_decoding and not p3.enable_overlap_schedule - plat.is_cuda.return_value = False # init_control with patch(f"{WP}.FMQ") as fmq: p4 = _make(pw, **{"parallel_config.local_engine_worker_queue_port": 5555}) @@ -303,7 +362,7 @@ def test_proc_init_and_control(pw): def test_health_and_task_queue(pw): """init_health_status + start_task_queue_service.""" - with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.time") as t: + with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.IPCLock"), patch(f"{WP}.time") as t: t.time.return_value = 1000.0 _make(pw, **{"parallel_config.data_parallel_size": 1}).init_health_status() assert ipc.call_count > 0 @@ -372,6 +431,20 @@ def test_kv_cache(pw): p2.worker.cal_theortical_kvcache.return_value = 1024**2 p2.initialize_kv_cache() p2.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=1024) + # Multi-rank profile → dist.all_reduce path (L626-628) + _, gw2 = pw + gw2.return_value = MagicMock() + with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist") as d2: + p2r = _make(pw, ranks=2, **{"parallel_config.do_profile": True}) + p2r.worker.determine_available_memory.return_value = 1024**3 + p2r.worker.cal_theortical_kvcache.return_value = 1024**2 + mock_t2 = MagicMock() + mock_t2.item.return_value = 512 + d2.all_reduce.return_value = None + with patch(f"{WP}.paddle") as pdl2: + pdl2.full.return_value = mock_t2 + p2r.initialize_kv_cache() + d2.all_reduce.assert_called_once() # Zero memory → ValueError with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): p3 = _make(pw, **{"parallel_config.do_profile": True}) From 98506a1a0df2237bb3d46ff6dc96aceb1d1514cb Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 13 Mar 2026 14:09:36 +0800 Subject: [PATCH 4/8] refactor: replace all MagicMock with SimpleNamespace + real objects - Replace 33 MagicMock instances with types.SimpleNamespace, real numpy arrays, and real callables (lambda/async def) - Remove unittest.mock.MagicMock and AsyncMock imports - Keep only unittest.mock.patch for module-level substitution - All 21 tests pass, coverage unchanged at 80% --- tests/worker/test_worker_process.py | 169 +++++++++++++++------------- 1 file changed, 92 insertions(+), 77 deletions(-) diff --git a/tests/worker/test_worker_process.py b/tests/worker/test_worker_process.py index fc5c7b54510..34a8577606f 100644 --- a/tests/worker/test_worker_process.py +++ b/tests/worker/test_worker_process.py @@ -14,8 +14,9 @@ import logging import sys +import types from contextlib import ExitStack -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import patch import numpy as np import pytest @@ -27,31 +28,42 @@ def _cfg(**overrides): - """Minimal FDConfig-like MagicMock.""" - c = MagicMock() - c.parallel_config.local_engine_worker_queue_port = 9999 - c.parallel_config.tensor_parallel_size = 1 - c.parallel_config.tensor_parallel_rank = 0 - c.parallel_config.data_parallel_size = 1 - c.parallel_config.data_parallel_rank = 0 - c.parallel_config.local_data_parallel_id = 0 - c.parallel_config.engine_pid = 12345 - c.parallel_config.pod_ip = "127.0.0.1" - c.parallel_config.use_ep = False - c.parallel_config.expert_parallel_size = 1 - c.parallel_config.tp_group = MagicMock() - c.parallel_config.shutdown_comm_group_if_worker_idle = False - c.cache_config.num_cpu_blocks = 0 - c.cache_config.total_block_num = 100 - c.scheduler_config.enable_overlap_schedule = False - c.scheduler_config.splitwise_role = "mixed" - c.speculative_config.method = None - c.eplb_config.enable_eplb = False - c.load_config.dynamic_load_weight = False - c.model_config.enable_mm = False - c.model_config.enable_logprob = False - c.model_config.architectures = ["LlamaForCausalLM"] - c.nnode = 1 + """Minimal FDConfig-like namespace.""" + c = types.SimpleNamespace( + parallel_config=types.SimpleNamespace( + local_engine_worker_queue_port=9999, + tensor_parallel_size=1, + tensor_parallel_rank=0, + data_parallel_size=1, + data_parallel_rank=0, + local_data_parallel_id=0, + engine_pid=12345, + pod_ip="127.0.0.1", + use_ep=False, + expert_parallel_size=1, + tp_group=None, + shutdown_comm_group_if_worker_idle=False, + ), + cache_config=types.SimpleNamespace( + num_cpu_blocks=0, + total_block_num=100, + ), + scheduler_config=types.SimpleNamespace( + enable_overlap_schedule=False, + splitwise_role="mixed", + ), + speculative_config=types.SimpleNamespace(method=None), + eplb_config=types.SimpleNamespace(enable_eplb=False), + load_config=types.SimpleNamespace(dynamic_load_weight=False), + model_config=types.SimpleNamespace( + enable_mm=False, + enable_logprob=False, + architectures=["LlamaForCausalLM"], + tensor_model_parallel_size=1, + vision_config=types.SimpleNamespace(dtype=None), + ), + nnode=1, + ) for k, v in overrides.items(): parts = k.split(".") obj = c @@ -128,9 +140,8 @@ def test_get_worker(platform, module_path, class_name): for a in ("is_dcu", "is_cuda", "is_xpu", "is_iluvatar", "is_gcu", "is_maca", "is_intel_hpu"): getattr(plat, a).return_value = False getattr(plat, platform).return_value = True - mock_mod = MagicMock() - sentinel = MagicMock() - setattr(mock_mod, class_name, MagicMock(return_value=sentinel)) + sentinel = object() + mock_mod = types.SimpleNamespace(**{class_name: lambda *a, **kw: sentinel}) with patch.dict("sys.modules", {module_path: mock_mod}): assert get_worker(_cfg(), local_rank=0, rank=1) is sentinel @@ -240,7 +251,7 @@ def _fd_config_env(): mc.is_quantized = False mc.quantization_config = None mc.head_dim = 128 - mc.pretrained_config = MagicMock() + mc.pretrained_config = types.SimpleNamespace() pc = m["ParallelConfig"].return_value pc.tensor_parallel_size = 1 pc.data_parallel_size = 1 @@ -271,7 +282,7 @@ def test_initialize_fd_config(): m2["ParallelConfig"].return_value.expert_parallel_size = 2 m2["ModelConfig"].return_value.moe_num_experts = [8] m2["EPLBConfig"].return_value.redundant_experts_num = 0 - m2["parse_quant_config"].return_value = MagicMock() + m2["parse_quant_config"].return_value = types.SimpleNamespace() m2["ModelConfig"].return_value.is_quantized = True initialize_fd_config(args2, ranks=2, local_rank=0) m2["FDConfig"].assert_called_once() @@ -315,7 +326,7 @@ def test_initialize_fd_config(): # Quant config present but not pre-quantized → online quant log (L1138) args_q, stack_q, m_q = _fd_config_env() with stack_q: - m_q["parse_quant_config"].return_value = MagicMock() + m_q["parse_quant_config"].return_value = types.SimpleNamespace() m_q["ModelConfig"].return_value.is_quantized = False initialize_fd_config(args_q, ranks=1, local_rank=0) # Splitwise prefill with V1 scheduler → PREFILL_NODE_ONE_STEP_STOP_V1="1" (L1159) @@ -387,18 +398,14 @@ def test_health_and_task_queue(pw): def test_load_model_and_graph(pw): """load_model, init_device, graph_optimize_and_warm_up_model.""" with patch(f"{WP}.IPCSignal") as ipc: - sig = MagicMock() - sig.value = np.zeros([1], dtype=np.int32) - ipc.return_value = sig + ipc.return_value = types.SimpleNamespace(value=np.zeros([1], dtype=np.int32)) p = _make(pw) p.load_model() p.worker.load_model.assert_called_once() assert p.loaded_model_signal.value[0] == 1 # Distributed load with patch(f"{WP}.IPCSignal") as ipc, patch(f"{WP}.paddle") as pdl: - sig = MagicMock() - sig.value = np.zeros([1], dtype=np.int32) - ipc.return_value = sig + ipc.return_value = types.SimpleNamespace(value=np.zeros([1], dtype=np.int32)) _make(pw, ranks=2).load_model() pdl.distributed.barrier.assert_called_once() # init_device + graph optimize @@ -424,7 +431,7 @@ def test_kv_cache(pw): p.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=42) # Profile: normal (reset worker mock to avoid cross-contamination) _, gw = pw - gw.return_value = MagicMock() + gw.return_value.reset_mock() with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist"): p2 = _make(pw, **{"parallel_config.do_profile": True}) p2.worker.determine_available_memory.return_value = 1024**3 @@ -433,16 +440,14 @@ def test_kv_cache(pw): p2.worker.initialize_cache.assert_called_once_with(num_gpu_blocks=1024) # Multi-rank profile → dist.all_reduce path (L626-628) _, gw2 = pw - gw2.return_value = MagicMock() + gw2.return_value.reset_mock() with patch(f"{WP}.IPCSignal"), patch(f"{WP}.dist") as d2: p2r = _make(pw, ranks=2, **{"parallel_config.do_profile": True}) p2r.worker.determine_available_memory.return_value = 1024**3 p2r.worker.cal_theortical_kvcache.return_value = 1024**2 - mock_t2 = MagicMock() - mock_t2.item.return_value = 512 d2.all_reduce.return_value = None with patch(f"{WP}.paddle") as pdl2: - pdl2.full.return_value = mock_t2 + pdl2.full.return_value = types.SimpleNamespace(item=lambda: 512) p2r.initialize_kv_cache() d2.all_reduce.assert_called_once() # Zero memory → ValueError @@ -465,19 +470,21 @@ def test_kv_cache(pw): def test_control_method(pw): """run_control_method: bad method, success, exception.""" p = _make(pw) - p._ctrl_output = MagicMock() - p._ctrl_output.put = AsyncMock() - req = MagicMock() - req.request_id, req.method, req.args = "r1", "bad", {} + + async def _noop_put(*a, **kw): + pass + + p._ctrl_output = types.SimpleNamespace(put=_noop_put) + req = types.SimpleNamespace(request_id="r1", method="bad", args={}) p.worker.bad = None p.run_control_method(req) # success - p.worker.do_it = MagicMock(return_value={"ok": True}) + p.worker.do_it.return_value = {"ok": True} req.method, req.args = "do_it", {"x": 1} p.run_control_method(req) p.worker.do_it.assert_called_once_with(x=1) # exception - p.worker.fail = MagicMock(side_effect=RuntimeError("boom")) + p.worker.fail.side_effect = RuntimeError("boom") req.method, req.args = "fail", {} p.run_control_method(req) @@ -511,14 +518,10 @@ def test_eplb(pw): ) p3.last_dump_expert_workload_ts = 0 stats = np.ones((4, 8), dtype=np.int32) - p3.local_experts_token_stats_array = MagicMock() - p3.local_experts_token_stats_array.value = stats.copy() - p3.signal_clear_experts_token_stats = MagicMock() - p3.signal_clear_experts_token_stats.value = np.array([1], dtype=np.int32) - p3.signal_update_weight_from_tensor_array = MagicMock() - p3.signal_update_weight_from_tensor_array.value = np.array([1], dtype=np.int32) - p3.rearrange_experts_signal = MagicMock() - p3.rearrange_experts_signal.value = np.zeros([1], dtype=np.int32) + p3.local_experts_token_stats_array = types.SimpleNamespace(value=stats.copy()) + p3.signal_clear_experts_token_stats = types.SimpleNamespace(value=np.array([1], dtype=np.int32)) + p3.signal_update_weight_from_tensor_array = types.SimpleNamespace(value=np.array([1], dtype=np.int32)) + p3.rearrange_experts_signal = types.SimpleNamespace(value=np.zeros([1], dtype=np.int32)) p3.mmap_infos = {"main": "data"} p3.worker.get_model().redundant_table_manger.get_expert_tokens_stats.return_value = ( stats, @@ -528,9 +531,7 @@ def test_eplb(pw): ) with patch(f"{WP}.time") as t, patch(f"{WP}.paddle") as pdl: t.time.return_value = 100.0 - mock_data = MagicMock() - mock_data.__getitem__ = MagicMock(return_value=147183647) - pdl.to_tensor.return_value = mock_data + pdl.to_tensor.return_value = np.array([147183647]) with patch.object(p3, "update_weights_from_tensor"): p3._run_eplb(tp_rank=0) assert p3.signal_clear_experts_token_stats.value[0] == 0 @@ -548,25 +549,27 @@ def test_barrier_broadcast_update(pw): plat, gw = pw plat.is_xpu.return_value = True p2 = _make(pw) - p2.task_queue = MagicMock() + _barrier_waited = [] + p2.task_queue = types.SimpleNamespace( + worker_process_tp_barrier=types.SimpleNamespace(wait=lambda: _barrier_waited.append(1)) + ) p2._tp_barrier_wait() - p2.task_queue.worker_process_tp_barrier.wait.assert_called_once() + assert len(_barrier_waited) == 1 plat.is_xpu.return_value = False # broadcast with patch(f"{WP}.paddle") as pdl: p3 = _make(pw, ranks=2) p3.model_weights_signal = np.array([42], dtype=np.int32) - mock_t = MagicMock() - mock_t.numpy.return_value = np.array([42]) - pdl.full.return_value = mock_t + pdl.full.return_value = types.SimpleNamespace(numpy=lambda: np.array([42])) assert p3._broadcast_model_weights_signal(src=0, group=None) == 42 # update_weights_from_tensor with patch(f"{WP}.load_tensor_from_shm_mem") as load, patch(f"{WP}.MODEL_MAIN_NAME", "main"): p4 = _make(pw) - p4.experts_manager = MagicMock() - p4.experts_manager.tensor_infos = {"x": 1} - p4.experts_manager.get_ep_rank_to_expert_id_list.return_value = ([1], {0: 1}, 1) - load.return_value = {"w": MagicMock()} + p4.experts_manager = types.SimpleNamespace( + tensor_infos={"x": 1}, + get_ep_rank_to_expert_id_list=lambda: ([1], {0: 1}, 1), + ) + load.return_value = {"w": np.zeros(1)} p4.update_weights_from_tensor({"main": "data"}) load.assert_called_once() assert p4.experts_manager.tensor_infos is None @@ -588,8 +591,7 @@ def test_run_worker_proc(): ): plat.is_iluvatar.return_value = False env.FD_DETERMINISTIC_MODE = False - wp = MagicMock() - cls.return_value = wp + wp = cls.return_value run_worker_proc() cls.assert_called_once() wp.init_control.assert_called_once() @@ -597,17 +599,30 @@ def test_run_worker_proc(): # Deterministic mode cls.reset_mock() env.FD_DETERMINISTIC_MODE = True - wp2 = MagicMock() - cls.return_value = wp2 - mock_biao = MagicMock() + _dm_calls = [] + mock_biao = types.SimpleNamespace(init_deterministic_mode=lambda: _dm_calls.append(1)) with patch.dict("sys.modules", {"fastdeploy.model_executor.layers.batch_invariant_ops": mock_biao}): run_worker_proc() - mock_biao.init_deterministic_mode.assert_called_once() + assert len(_dm_calls) == 1 # Iluvatar path cls.reset_mock() plat.is_iluvatar.return_value = True env.FD_DETERMINISTIC_MODE = False - mock_il = MagicMock() + _il_calls = [] + + def _make_il_wp(*a, **kw): + _il_calls.append(1) + return types.SimpleNamespace( + init_device=lambda: None, + load_model=lambda: None, + initialize_kv_cache=lambda: None, + graph_optimize_and_warm_up_model=lambda: None, + init_health_status=lambda: None, + start_task_queue_service=lambda: None, + event_loop_normal=lambda: None, + ) + + mock_il = types.SimpleNamespace(IluvatarPaddleDisWorkerProc=_make_il_wp) with patch.dict("sys.modules", {"fastdeploy.worker.iluvatar_worker": mock_il}): run_worker_proc() - mock_il.IluvatarPaddleDisWorkerProc.assert_called_once() + assert len(_il_calls) == 1 From 31d6ec55e7e1322d18668bebd4ee435552b254c2 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 13 Mar 2026 17:41:31 +0800 Subject: [PATCH 5/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.36=E3=80=91add=20=5F=5Fmain=5F=5F=20block?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/worker/test_worker_process.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/worker/test_worker_process.py b/tests/worker/test_worker_process.py index 34a8577606f..e8e97730f43 100644 --- a/tests/worker/test_worker_process.py +++ b/tests/worker/test_worker_process.py @@ -626,3 +626,7 @@ def _make_il_wp(*a, **kw): with patch.dict("sys.modules", {"fastdeploy.worker.iluvatar_worker": mock_il}): run_worker_proc() assert len(_il_calls) == 1 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From b5bab0fe17f81cd3620659bb5d36333e05c19dd8 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 20 Mar 2026 00:31:11 +0800 Subject: [PATCH 6/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.36=E3=80=91cover=20dp=20health=20init=20and=20weight=20wait=20?= =?UTF-8?q?loop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/worker/test_worker_process.py | 63 +++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/tests/worker/test_worker_process.py b/tests/worker/test_worker_process.py index e8e97730f43..6f64a033ad6 100644 --- a/tests/worker/test_worker_process.py +++ b/tests/worker/test_worker_process.py @@ -395,6 +395,37 @@ def test_health_and_task_queue(pw): assert tq.call_args[1]["address"] == ("10.0.0.1", 8888) +def test_init_health_status_data_parallel_branch(pw): + """init_health_status: data_parallel_size > 1 path with launched signal wait.""" + + class _Sig: + def __init__(self, value): + self.value = value + + calls = [] + + def _mk_signal(name=None, array=None, **kwargs): + calls.append(name) + if name == "launched_expert_service_signal": + return _Sig(np.array([1, 0], dtype=np.int32)) + return _Sig(np.array(array, copy=True)) + + with patch(f"{WP}.envs") as env, patch(f"{WP}.IPCSignal", side_effect=_mk_signal), patch(f"{WP}.IPCLock"): + env.FD_ENABLE_MULTI_API_SERVER = False + p = _make( + pw, + **{ + "parallel_config.data_parallel_size": 2, + "parallel_config.local_data_parallel_id": 0, + "parallel_config.tensor_parallel_size": 2, + "nnode": 1, + }, + ) + p.init_health_status() + assert "launched_expert_service_signal" in calls + assert p.worker_ready_signal.value[0] == 1 + + def test_load_model_and_graph(pw): """load_model, init_device, graph_optimize_and_warm_up_model.""" with patch(f"{WP}.IPCSignal") as ipc: @@ -575,6 +606,38 @@ def test_barrier_broadcast_update(pw): assert p4.experts_manager.tensor_infos is None +def test_update_weights_from_tensor_waits_once(pw): + """update_weights_from_tensor: waits when tensor infos are initially unavailable.""" + + class _TensorInfos: + def __init__(self): + self._values = [None, {"x": 1}, {"x": 1}] + + @property + def tensor_infos(self): + if len(self._values) > 1: + return self._values.pop(0) + return self._values[0] + + @tensor_infos.setter + def tensor_infos(self, value): + self._values = [value] + + tm = _TensorInfos() + with ( + patch(f"{WP}.load_tensor_from_shm_mem") as load, + patch(f"{WP}.MODEL_MAIN_NAME", "main"), + patch("time.sleep") as sleep_mock, + ): + p = _make(pw) + p.experts_manager = tm + p.experts_manager.get_ep_rank_to_expert_id_list = lambda: ([1], {0: 1}, 1) + load.return_value = {"w": np.zeros(1)} + p.update_weights_from_tensor({"main": "data"}) + sleep_mock.assert_called_once() + assert p.experts_manager.tensor_infos is None + + # -- run_worker_proc ----------------------------------------------------------- From 591ae0c400e499dfb722e519085abc5792805b3f Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 20 Mar 2026 00:52:45 +0800 Subject: [PATCH 7/8] [CI] retrigger after clone-stage infra failure From aeee5b114d2df6e6e65c55d62d476cbc29f2496b Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 20 Mar 2026 13:50:08 +0800 Subject: [PATCH 8/8] =?UTF-8?q?[CI]=E3=80=90Hackathon=2010th=20Spring=20No?= =?UTF-8?q?.36=E3=80=91boost=20coverage:=20event=5Floop=5Fnormal,=20kvcach?= =?UTF-8?q?e=20lock,=20eplb=20elif=20=E2=80=94=20delta=20122=E2=86=92152?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/worker/test_worker_process.py | 126 ++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/tests/worker/test_worker_process.py b/tests/worker/test_worker_process.py index 6f64a033ad6..ea06a00413e 100644 --- a/tests/worker/test_worker_process.py +++ b/tests/worker/test_worker_process.py @@ -691,5 +691,131 @@ def _make_il_wp(*a, **kw): assert len(_il_calls) == 1 +# -- kvcache lock + eplb elif + event_loop_normal ------------------------------ + + +class _BreakLoop(Exception): + """Break out of event_loop_normal's while-True.""" + + +class _FakeCtrlReq: + """Duck-typed ControlRequest for isinstance checks.""" + + def __init__(self, **kw): + self.__dict__.update(kw) + + +def _loop_proc(pw, max_iter=1, **extra_cfg): + """PaddleDisWorkerProc wired for event_loop_normal.""" + defaults = { + "eplb_config.enable_eplb": False, + "load_config.dynamic_load_weight": False, + "parallel_config.tensor_parallel_size": 1, + "parallel_config.use_ep": False, + "cache_config.kvcache_storage_backend": None, + } + defaults.update(extra_cfg) + p = _make(pw, **defaults) + p.worker_healthy_live_signal = types.SimpleNamespace(value=np.zeros([8], dtype=np.int32)) + p.exist_task_signal = types.SimpleNamespace(value=np.array([0], dtype=np.int32)) + p.exist_prefill_task_signal = types.SimpleNamespace(value=np.array([0], dtype=np.int32)) + p.task_queue = types.SimpleNamespace( + exist_tasks=lambda: False, + get_tasks=lambda: ([], False), + read_finish_flag=types.SimpleNamespace(get=lambda: 0, set=lambda v: None), + num_tasks=lambda: 0, + clear_data=lambda: None, + ) + p.worker.model_runner = types.SimpleNamespace(not_need_stop=lambda: True, current_launch_token_num=1) + p.worker.execute_model = lambda *a, **kw: None + p.worker.exist_prefill = lambda: False + p.worker.preprocess_new_task = lambda *a, **kw: None + _iter = [0] + + def _counting_eplb(tp_rank): + _iter[0] += 1 + if _iter[0] > max_iter: + raise _BreakLoop + + p._run_eplb = _counting_eplb + return p + + +def test_kvcache_lock(pw): + """_acquire/_release_kvcache_lock with lock enabled.""" + with patch(f"{WP}.envs") as env: + env.FD_USE_KVCACHE_LOCK = True + p = _make(pw) + p.gpu_cache_lock = types.SimpleNamespace(acquire=lambda: None, release=lambda: None) + p._acquire_kvcache_lock(0) + p._release_kvcache_lock(0) + + +def test_run_eplb_token_stats_none(pw): + """_run_eplb: elif branch when token stats value is None.""" + p = _make( + pw, + **{ + "eplb_config.enable_eplb": True, + "eplb_config.redundant_expert_dump_workload_interval": 10, + }, + ) + p.last_dump_expert_workload_ts = 0 + p.local_experts_token_stats_array = types.SimpleNamespace(value=None) + p.signal_update_weight_from_tensor_array = types.SimpleNamespace(value=np.array([0], dtype=np.int32)) + p.rearrange_experts_signal = types.SimpleNamespace(value=np.zeros([1], dtype=np.int32)) + p.mmap_infos = {} + with patch(f"{WP}.time") as t, patch(f"{WP}.paddle") as pdl: + t.time.return_value = 100.0 + pdl.to_tensor.return_value = np.array([0]) + p._run_eplb(tp_rank=0) + + +def test_event_loop_control_req(pw): + """event_loop_normal: ControlRequest processing + exist_prefill.""" + p = _loop_proc(pw) + p.exist_task_signal.value[0] = 1 # ExistTaskStatus.EXIST + ctrl = _FakeCtrlReq(request_id="r1", method="noop", args={}) + p.task_queue.get_tasks = lambda: ([([ctrl], 1)], True) + p.run_control_method = lambda req: None + with patch(f"{WP}.ControlRequest", _FakeCtrlReq), patch(f"{WP}.envs") as env: + env.ENABLE_V1_KVCACHE_SCHEDULER = False + env.FD_USE_KVCACHE_LOCK = False + with pytest.raises(_BreakLoop): + p.event_loop_normal() + + +def test_event_loop_not_need_stop(pw): + """event_loop_normal: not_need_stop early continue path.""" + p = _loop_proc(pw) + p.worker.model_runner.not_need_stop = lambda: False + with patch(f"{WP}.envs") as env: + env.ENABLE_V1_KVCACHE_SCHEDULER = True + env.FD_USE_KVCACHE_LOCK = False + with pytest.raises(_BreakLoop): + p.event_loop_normal() + + +def test_event_loop_dynamic_weight(pw): + """event_loop_normal: dynamic_load_weight UPDATING path.""" + p = _loop_proc(pw, **{"load_config.dynamic_load_weight": True}) + p.model_weights_status = types.SimpleNamespace(value=np.array([1], dtype=np.int32)) # UPDATING + p.kv_cache_status = types.SimpleNamespace(value=np.array([0], dtype=np.int32)) + mock_rl = types.ModuleType("fastdeploy.rl") + mock_dwm = types.ModuleType("fastdeploy.rl.dynamic_weight_manager") + mock_dwm.DynamicWeightManager = types.SimpleNamespace(check_model_weights_status=lambda *a, **kw: None) + with ( + patch(f"{WP}.envs") as env, + patch.dict( + "sys.modules", + {"fastdeploy.rl": mock_rl, "fastdeploy.rl.dynamic_weight_manager": mock_dwm}, + ), + ): + env.ENABLE_V1_KVCACHE_SCHEDULER = False + env.FD_USE_KVCACHE_LOCK = False + with pytest.raises(_BreakLoop): + p.event_loop_normal() + + if __name__ == "__main__": pytest.main([__file__, "-v"])