diff --git a/conftest.py b/conftest.py index 820b9f47..fd8fea5d 100644 --- a/conftest.py +++ b/conftest.py @@ -1,5 +1,6 @@ from __future__ import annotations +import base64 import json import logging import os @@ -19,6 +20,10 @@ if TYPE_CHECKING: from kubernetes.dynamic import DynamicClient + +from ocp_resources.cluster_role import ClusterRole +from ocp_resources.cluster_role_binding import ClusterRoleBinding +from ocp_resources.role_binding import RoleBinding from ocp_resources.forklift_controller import ForkliftController from ocp_resources.namespace import Namespace from ocp_resources.network_attachment_definition import NetworkAttachmentDefinition @@ -27,13 +32,14 @@ from ocp_resources.provider import Provider from ocp_resources.resource import ResourceEditor from ocp_resources.secret import Secret +from ocp_resources.service_account import ServiceAccount from ocp_resources.storage_class import StorageClass from ocp_resources.storage_profile import StorageProfile from ocp_resources.subscription import Subscription from ocp_resources.virtual_machine import VirtualMachine from pytest_harvest import get_fixture_store from pytest_testconfig import config as py_config -from timeout_sampler import TimeoutSampler +from timeout_sampler import TimeoutExpiredError, TimeoutSampler from exceptions.exceptions import ( ForkliftPodsNotRunningError, @@ -558,6 +564,10 @@ def virtctl_binary(ocp_admin_client: "DynamicClient") -> Path: PermissionError: If shared directory is a symlink (hijack attempt). TimeoutError: If timeout waiting for file lock. """ + # if env variable VIRTCTL_PATH is set, use it. + if virtctl_env_path := os.environ.get("VIRTCTL_PATH"): + return Path(virtctl_env_path) + # Get cluster version for versioned caching cluster_version_str = get_cluster_version_str(ocp_admin_client) @@ -952,6 +962,206 @@ def destination_ocp_provider(fixture_store, destination_ocp_secret, ocp_admin_cl yield OCPProvider(ocp_resource=provider, fixture_store=fixture_store) +# Existing ClusterRole name from operator/PR - test verifies migration with this role only. +# Equivalent to: oc create clusterrolebinding ... --clusterrole=forklift-migrator-role +FORKLIFT_MIGRATOR_ROLE_NAME = "forklift-migrator-role" + + +@pytest.fixture(scope="session") +def clusterrole_destination_ocp_provider( + fixture_store: dict[str, Any], + ocp_admin_client: "DynamicClient", + session_uuid: str, + target_namespace: str, + mtv_namespace: str, +) -> OCPProvider: + """Create a token-based OCP provider using the existing forklift-migrator-role and a fresh SA. + + Verifies the flow: + 1. Create a fresh ServiceAccount (in MTV operator namespace) + 2. Bind it ONLY to the existing ClusterRole forklift-migrator-role (from operator/PR) + 3. Create a token for that SA (equivalent to: oc create token -n ) + 4. Create Forklift Provider CR using that token (in target namespace) + + Does NOT create the ClusterRole; forklift-migrator-role must already exist in the cluster. + + Args: + fixture_store (dict[str, Any]): Fixture store for resource tracking and teardown. + ocp_admin_client (DynamicClient): OpenShift DynamicClient for cluster operations. + session_uuid (str): Unique session identifier for resource naming. + target_namespace (str): Namespace for provider resources (Provider CR, provider secret). + mtv_namespace (str): MTV operator namespace for ServiceAccount and token. + + Returns: + OCPProvider: Token-based OCP provider bound to forklift-migrator-role. + + Raises: + ValueError: If the SA token is not populated within 60s. + """ + sa_name = f"{session_uuid}-forklift-migrator-sa" + binding_name = f"{session_uuid}-forklift-migrator-binding" + token_secret_name = f"{session_uuid}-clusterrole-token" + provider_name = f"{session_uuid}-clusterrole-destination-ocp-provider" + + # 1. Create a fresh ServiceAccount in MTV operator namespace + create_and_store_resource( + client=ocp_admin_client, + fixture_store=fixture_store, + resource=ServiceAccount, + name=sa_name, + namespace=mtv_namespace, + ) + + # 2. Bind it ONLY to the existing forklift-migrator-role (from PR) + cluster_role_binding = create_and_store_resource( + client=ocp_admin_client, + fixture_store=fixture_store, + resource=ClusterRoleBinding, + name=binding_name, + cluster_role=FORKLIFT_MIGRATOR_ROLE_NAME, + subjects=[{"kind": "ServiceAccount", "name": sa_name, "namespace": mtv_namespace}], + ) + cluster_role_binding.wait() + + # Token secret: create Secret with type service-account-token so cluster populates token + create_and_store_resource( + client=ocp_admin_client, + fixture_store=fixture_store, + resource=Secret, + name=token_secret_name, + namespace=mtv_namespace, + type="kubernetes.io/service-account-token", + annotations={"kubernetes.io/service-account.name": sa_name}, + ) + + # Wait for token to be populated (controller fills it asynchronously) + token_secret_ref = Secret( + client=ocp_admin_client, + name=token_secret_name, + namespace=mtv_namespace, + ) + + def _has_token() -> str | None: + token_secret_ref.wait() + return (token_secret_ref.instance.data or {}).get("token") + + token_b64 = None + try: + for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token): + if sample: + token_b64 = sample + break + if not token_b64: + raise ValueError( + f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s" + ) + except TimeoutExpiredError: + raise ValueError( + f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s" + ) from None + + token_value = base64.b64decode(token_b64).decode("utf-8") + + # Provider secret (Forklift expects "token" and "insecureSkipVerify") - in target namespace + provider_secret = create_and_store_resource( + client=ocp_admin_client, + fixture_store=fixture_store, + resource=Secret, + name=f"{provider_name}-secret", + namespace=target_namespace, + string_data={"token": token_value, "insecureSkipVerify": "true"}, + ) + + # Provider CR - in target namespace + provider = create_and_store_resource( + client=ocp_admin_client, + fixture_store=fixture_store, + resource=Provider, + name=provider_name, + namespace=target_namespace, + secret_name=provider_secret.name, + secret_namespace=provider_secret.namespace, + url=ocp_admin_client.configuration.host, + provider_type=Provider.ProviderType.OPENSHIFT, + ) + + return OCPProvider(ocp_resource=provider, fixture_store=fixture_store) + + +@pytest.fixture(scope="class") +def forklift_scc_binding( + fixture_store: dict[str, Any], + ocp_admin_client: "DynamicClient", + session_uuid: str, + target_namespace: str, +) -> Generator[RoleBinding, None, None]: + """Create ClusterRole and RoleBinding to grant forklift-controller-scc SCC to the default SA. + + Class-scoped so that resources are created per test class and torn down after + the class completes. This ensures "without SCC" test classes run against a + clean state where neither the ClusterRole nor the RoleBinding exist. + + The ``oc adm policy add-scc-to-user`` command creates two resources: + 1. A ClusterRole (``system:openshift:scc:forklift-controller-scc``) with a rule + granting the ``use`` verb on the SCC. + 2. A RoleBinding in the target namespace that binds the default ServiceAccount + to that ClusterRole. + + Migration pods (guest conversion) run under the namespace's default ServiceAccount, + not the provider's token SA. The SCC must be granted to this default SA so the + forklift controller can create conversion pods with the required security context. + + Equivalent to: oc adm policy add-scc-to-user forklift-controller-scc -z default -n + + Resources are also tracked in ``fixture_store["teardown"]`` via + ``create_and_store_resource()`` as a safety net; session-level teardown will + silently skip already-deleted resources. + + Args: + fixture_store (dict[str, Any]): Fixture store for resource tracking and teardown. + ocp_admin_client (DynamicClient): OpenShift DynamicClient for cluster operations. + session_uuid (str): Unique session identifier for resource naming. + target_namespace (str): Namespace where migration pods run. + + Yields: + RoleBinding: The created RoleBinding resource. + """ + scc_cluster_role_name = "system:openshift:scc:forklift-controller-scc" + + scc_cluster_role = create_and_store_resource( + client=ocp_admin_client, + fixture_store=fixture_store, + resource=ClusterRole, + name=scc_cluster_role_name, + rules=[ + { + "apiGroups": ["security.openshift.io"], + "resourceNames": ["forklift-controller-scc"], + "resources": ["securitycontextconstraints"], + "verbs": ["use"], + } + ], + ) + + role_binding = create_and_store_resource( + client=ocp_admin_client, + fixture_store=fixture_store, + resource=RoleBinding, + name=f"{session_uuid}-forklift-scc-binding", + namespace=target_namespace, + role_ref_kind="ClusterRole", + role_ref_name=scc_cluster_role_name, + subjects_kind="ServiceAccount", + subjects_name="default", + subjects_namespace=target_namespace, + ) + + yield role_binding + + role_binding.clean_up() + scc_cluster_role.clean_up() + + @pytest.fixture(scope="class") def class_plan_config(request: pytest.FixtureRequest) -> dict[str, Any]: """Get plan configuration for class-based tests. diff --git a/tests/cold/test_mtv_cold_clusterrole_migration.py b/tests/cold/test_mtv_cold_clusterrole_migration.py new file mode 100644 index 00000000..41eef9f1 --- /dev/null +++ b/tests/cold/test_mtv_cold_clusterrole_migration.py @@ -0,0 +1,250 @@ +import pytest +from exceptions.exceptions import MigrationPlanExecError +from ocp_resources.network_map import NetworkMap +from ocp_resources.plan import Plan +from ocp_resources.storage_map import StorageMap +from pytest_testconfig import config as py_config + +from utilities.post_migration import verify_vms_running +from utilities.mtv_migration import ( + create_plan_resource, + execute_migration, + get_network_migration_map, + get_storage_migration_map, +) +from utilities.utils import get_value_from_py_config, populate_vm_ids + + +@pytest.mark.remote +@pytest.mark.tier0 +@pytest.mark.incremental +@pytest.mark.skipif( + not get_value_from_py_config("remote_ocp_cluster"), + reason="No remote OCP cluster provided", +) +@pytest.mark.parametrize( + "class_plan_config", + [pytest.param(py_config["tests_params"]["test_mtv_clusterrole_cold_migration"])], + indirect=True, + ids=["MTV-3129-clusterrole-cold"], +) +@pytest.mark.usefixtures("cleanup_migrated_vms") +class TestClusterroleColdMtvMigration: + """Verify ClusterRole (forklift-migrator-role) without SCC fails cold migration.""" + + storage_map: StorageMap + network_map: NetworkMap + plan_resource: Plan + + def test_create_storagemap( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + source_provider_inventory, + target_namespace, + ): + """Create StorageMap resource for migration.""" + vms = [vm["name"] for vm in prepared_plan["virtual_machines"]] + self.__class__.storage_map = get_storage_migration_map( + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + source_provider_inventory=source_provider_inventory, + ocp_admin_client=ocp_admin_client, + target_namespace=target_namespace, + vms=vms, + ) + assert self.storage_map, "StorageMap creation failed" + + def test_create_networkmap( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + source_provider_inventory, + target_namespace, + multus_network_name, + ): + """Create NetworkMap resource for migration.""" + vms = [vm["name"] for vm in prepared_plan["virtual_machines"]] + self.__class__.network_map = get_network_migration_map( + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + source_provider_inventory=source_provider_inventory, + ocp_admin_client=ocp_admin_client, + target_namespace=target_namespace, + multus_network_name=multus_network_name, + vms=vms, + ) + assert self.network_map, "NetworkMap creation failed" + + def test_create_plan( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + target_namespace, + source_provider_inventory, + ): + """Create MTV Plan CR resource.""" + populate_vm_ids(prepared_plan, source_provider_inventory) + + self.__class__.plan_resource = create_plan_resource( + ocp_admin_client=ocp_admin_client, + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + storage_map=self.storage_map, + network_map=self.network_map, + virtual_machines_list=prepared_plan["virtual_machines"], + target_namespace=target_namespace, + warm_migration=prepared_plan.get("warm_migration", False), + ) + assert self.plan_resource, "Plan creation failed" + + def test_migrate_vms( + self, + fixture_store, + ocp_admin_client, + target_namespace, + ): + """Execute migration, expecting failure without SCC binding.""" + with pytest.raises(MigrationPlanExecError): + execute_migration( + ocp_admin_client=ocp_admin_client, + fixture_store=fixture_store, + plan=self.plan_resource, + target_namespace=target_namespace, + ) + + +@pytest.mark.remote +@pytest.mark.tier0 +@pytest.mark.incremental +@pytest.mark.skipif( + not get_value_from_py_config("remote_ocp_cluster"), + reason="No remote OCP cluster provided", +) +@pytest.mark.parametrize( + "class_plan_config", + [pytest.param(py_config["tests_params"]["test_mtv_clusterrole_cold_migration_with_scc"])], + indirect=True, + ids=["MTV-3129-clusterrole-cold-with-scc"], +) +@pytest.mark.usefixtures("cleanup_migrated_vms") +class TestClusterroleColdWithSccMigration: + """Verify ClusterRole (forklift-migrator-role) with SCC binding succeeds cold migration.""" + + storage_map: StorageMap + network_map: NetworkMap + plan_resource: Plan + + def test_create_storagemap( + self, + forklift_scc_binding, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + source_provider_inventory, + target_namespace, + ): + """Create StorageMap resource for migration.""" + vms = [vm["name"] for vm in prepared_plan["virtual_machines"]] + self.__class__.storage_map = get_storage_migration_map( + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + source_provider_inventory=source_provider_inventory, + ocp_admin_client=ocp_admin_client, + target_namespace=target_namespace, + vms=vms, + ) + assert self.storage_map, "StorageMap creation failed" + + def test_create_networkmap( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + source_provider_inventory, + target_namespace, + multus_network_name, + ): + """Create NetworkMap resource for migration.""" + vms = [vm["name"] for vm in prepared_plan["virtual_machines"]] + self.__class__.network_map = get_network_migration_map( + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + source_provider_inventory=source_provider_inventory, + ocp_admin_client=ocp_admin_client, + target_namespace=target_namespace, + multus_network_name=multus_network_name, + vms=vms, + ) + assert self.network_map, "NetworkMap creation failed" + + def test_create_plan( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + target_namespace, + source_provider_inventory, + ): + """Create MTV Plan CR resource.""" + populate_vm_ids(prepared_plan, source_provider_inventory) + + self.__class__.plan_resource = create_plan_resource( + ocp_admin_client=ocp_admin_client, + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + storage_map=self.storage_map, + network_map=self.network_map, + virtual_machines_list=prepared_plan["virtual_machines"], + target_namespace=target_namespace, + warm_migration=prepared_plan.get("warm_migration", False), + ) + assert self.plan_resource, "Plan creation failed" + + def test_migrate_vms( + self, + fixture_store, + ocp_admin_client, + target_namespace, + ): + """Execute migration.""" + execute_migration( + ocp_admin_client=ocp_admin_client, + fixture_store=fixture_store, + plan=self.plan_resource, + target_namespace=target_namespace, + ) + + def test_check_vms( + self, + ocp_admin_client, + prepared_plan, + target_namespace, + ): + """Validate migrated VMs are running.""" + verify_vms_running( + ocp_admin_client=ocp_admin_client, + prepared_plan=prepared_plan, + target_namespace=target_namespace, + ) diff --git a/tests/tests_config/config.py b/tests/tests_config/config.py index 133b7f5d..73965380 100644 --- a/tests/tests_config/config.py +++ b/tests/tests_config/config.py @@ -475,6 +475,79 @@ "warm_migration": False, "copyoffload": True, }, + "test_target_scheduling_all_features": { + "virtual_machines": [ + { + "name": "mtv-tests-rhel8", + "source_vm_power": "on", + "guest_agent": True, + }, + ], + "warm_migration": False, + # MTV 2.10.0 target scheduling features + "target_node_selector": { + "mtv-test-node": None, # None = auto-generate with session_uuid + }, + "target_labels": { + "mtv-test-label": None, # None = auto-generate with session_uuid + "custom-label": "custom-value", # Static value + }, + "target_affinity": { + "podAffinity": { + "preferredDuringSchedulingIgnoredDuringExecution": [ + { + "podAffinityTerm": { + "labelSelector": {"matchLabels": {"app": "test"}}, + "topologyKey": "kubernetes.io/hostname", + }, + "weight": 50, + } + ] + } + }, + }, + "test_custom_nad_vm_namespace": { + "virtual_machines": [ + { + "name": "mtv-tests-rhel8", + "source_vm_power": "on", + "guest_agent": True, + }, + ], + "warm_migration": False, + "vm_target_namespace": "custom-vm-namespace", + "multus_namespace": "default", + }, + "test_mtv_clusterrole_cold_migration": { + "virtual_machines": [ + {"name": "mtv-tests-rhel8", "guest_agent": True}, + ], + "warm_migration": False, + }, + "test_mtv_clusterrole_warm_migration": { + "virtual_machines": [ + { + "name": "mtv-tests-rhel8", + "source_vm_power": "on", + "guest_agent": True, + }, + ], + "warm_migration": True, + }, + "test_mtv_clusterrole_cold_migration_with_scc": { + "virtual_machines": [{"name": "mtv-tests-rhel8", "source_vm_power": "on", "guest_agent": True}], + "warm_migration": False, + }, + "test_mtv_clusterrole_warm_migration_with_scc": { + "virtual_machines": [ + { + "name": "mtv-tests-rhel8", + "source_vm_power": "on", + "guest_agent": True, + }, + ], + "warm_migration": True, + }, "test_copyoffload_scale_migration": { "virtual_machines": [ { diff --git a/tests/warm/test_mtv_warm_clusterrole_migration.py b/tests/warm/test_mtv_warm_clusterrole_migration.py new file mode 100644 index 00000000..39d05100 --- /dev/null +++ b/tests/warm/test_mtv_warm_clusterrole_migration.py @@ -0,0 +1,255 @@ +import pytest +from exceptions.exceptions import MigrationPlanExecError +from ocp_resources.network_map import NetworkMap +from ocp_resources.plan import Plan +from ocp_resources.storage_map import StorageMap +from pytest_testconfig import config as py_config + +from utilities.post_migration import verify_vms_running +from utilities.migration_utils import get_cutover_value +from utilities.mtv_migration import ( + create_plan_resource, + execute_migration, + get_network_migration_map, + get_storage_migration_map, +) +from utilities.utils import get_value_from_py_config, populate_vm_ids + + +@pytest.mark.remote +@pytest.mark.tier0 +@pytest.mark.warm +@pytest.mark.incremental +@pytest.mark.skipif( + not get_value_from_py_config("remote_ocp_cluster"), + reason="No remote OCP cluster provided", +) +@pytest.mark.parametrize( + "class_plan_config", + [pytest.param(py_config["tests_params"]["test_mtv_clusterrole_warm_migration"])], + indirect=True, + ids=["MTV-3129-clusterrole-warm"], +) +@pytest.mark.usefixtures("cleanup_migrated_vms") +class TestClusterroleWarmMtvMigration: + """Verify ClusterRole (forklift-migrator-role) without SCC fails warm migration.""" + + storage_map: StorageMap + network_map: NetworkMap + plan_resource: Plan + + def test_create_storagemap( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + source_provider_inventory, + target_namespace, + ): + """Create StorageMap resource for migration.""" + vms = [vm["name"] for vm in prepared_plan["virtual_machines"]] + self.__class__.storage_map = get_storage_migration_map( + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + source_provider_inventory=source_provider_inventory, + ocp_admin_client=ocp_admin_client, + target_namespace=target_namespace, + vms=vms, + ) + assert self.storage_map, "StorageMap creation failed" + + def test_create_networkmap( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + source_provider_inventory, + target_namespace, + multus_network_name, + ): + """Create NetworkMap resource for migration.""" + vms = [vm["name"] for vm in prepared_plan["virtual_machines"]] + self.__class__.network_map = get_network_migration_map( + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + source_provider_inventory=source_provider_inventory, + ocp_admin_client=ocp_admin_client, + target_namespace=target_namespace, + multus_network_name=multus_network_name, + vms=vms, + ) + assert self.network_map, "NetworkMap creation failed" + + def test_create_plan( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + target_namespace, + source_provider_inventory, + ): + """Create MTV Plan CR resource.""" + populate_vm_ids(prepared_plan, source_provider_inventory) + + self.__class__.plan_resource = create_plan_resource( + ocp_admin_client=ocp_admin_client, + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + storage_map=self.storage_map, + network_map=self.network_map, + virtual_machines_list=prepared_plan["virtual_machines"], + target_namespace=target_namespace, + warm_migration=prepared_plan.get("warm_migration", False), + ) + assert self.plan_resource, "Plan creation failed" + + def test_migrate_vms( + self, + fixture_store, + ocp_admin_client, + target_namespace, + ): + """Execute warm migration, expecting failure without SCC binding.""" + with pytest.raises(MigrationPlanExecError): + execute_migration( + ocp_admin_client=ocp_admin_client, + fixture_store=fixture_store, + plan=self.plan_resource, + target_namespace=target_namespace, + cut_over=get_cutover_value(), + ) + + +@pytest.mark.remote +@pytest.mark.tier0 +@pytest.mark.warm +@pytest.mark.incremental +@pytest.mark.skipif( + not get_value_from_py_config("remote_ocp_cluster"), + reason="No remote OCP cluster provided", +) +@pytest.mark.parametrize( + "class_plan_config", + [pytest.param(py_config["tests_params"]["test_mtv_clusterrole_warm_migration_with_scc"])], + indirect=True, + ids=["MTV-3129-clusterrole-warm-with-scc"], +) +@pytest.mark.usefixtures("cleanup_migrated_vms") +class TestClusterroleWarmWithSccMigration: + """Verify ClusterRole (forklift-migrator-role) with SCC binding succeeds warm migration.""" + + storage_map: StorageMap + network_map: NetworkMap + plan_resource: Plan + + def test_create_storagemap( + self, + forklift_scc_binding, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + source_provider_inventory, + target_namespace, + ): + """Create StorageMap resource for migration.""" + vms = [vm["name"] for vm in prepared_plan["virtual_machines"]] + self.__class__.storage_map = get_storage_migration_map( + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + source_provider_inventory=source_provider_inventory, + ocp_admin_client=ocp_admin_client, + target_namespace=target_namespace, + vms=vms, + ) + assert self.storage_map, "StorageMap creation failed" + + def test_create_networkmap( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + source_provider_inventory, + target_namespace, + multus_network_name, + ): + """Create NetworkMap resource for migration.""" + vms = [vm["name"] for vm in prepared_plan["virtual_machines"]] + self.__class__.network_map = get_network_migration_map( + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + source_provider_inventory=source_provider_inventory, + ocp_admin_client=ocp_admin_client, + target_namespace=target_namespace, + multus_network_name=multus_network_name, + vms=vms, + ) + assert self.network_map, "NetworkMap creation failed" + + def test_create_plan( + self, + prepared_plan, + fixture_store, + ocp_admin_client, + source_provider, + clusterrole_destination_ocp_provider, + target_namespace, + source_provider_inventory, + ): + """Create MTV Plan CR resource.""" + populate_vm_ids(prepared_plan, source_provider_inventory) + + self.__class__.plan_resource = create_plan_resource( + ocp_admin_client=ocp_admin_client, + fixture_store=fixture_store, + source_provider=source_provider, + destination_provider=clusterrole_destination_ocp_provider, + storage_map=self.storage_map, + network_map=self.network_map, + virtual_machines_list=prepared_plan["virtual_machines"], + target_namespace=target_namespace, + warm_migration=prepared_plan.get("warm_migration", False), + ) + assert self.plan_resource, "Plan creation failed" + + def test_migrate_vms( + self, + fixture_store, + ocp_admin_client, + target_namespace, + ): + """Execute warm migration.""" + execute_migration( + ocp_admin_client=ocp_admin_client, + fixture_store=fixture_store, + plan=self.plan_resource, + target_namespace=target_namespace, + cut_over=get_cutover_value(), + ) + + def test_check_vms( + self, + ocp_admin_client, + prepared_plan, + target_namespace, + ): + """Validate migrated VMs are running.""" + verify_vms_running( + ocp_admin_client=ocp_admin_client, + prepared_plan=prepared_plan, + target_namespace=target_namespace, + ) diff --git a/utilities/post_migration.py b/utilities/post_migration.py index fd10c28d..5f9de219 100644 --- a/utilities/post_migration.py +++ b/utilities/post_migration.py @@ -3,8 +3,9 @@ import base64 import ipaddress import tempfile +import time from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import go_template import jc @@ -14,6 +15,7 @@ from ocp_resources.provider import Provider from ocp_resources.secret import Secret from ocp_resources.storage_map import StorageMap +from ocp_resources.virtual_machine import VirtualMachine from paramiko.ssh_exception import AuthenticationException, ChannelException, NoValidConnectionsError, SSHException from pyhelper_utils.exceptions import CommandExecFailed from pytest_testconfig import py_config @@ -28,8 +30,14 @@ from utilities.utils import get_cluster_version, get_value_from_py_config, rhv_provider from utilities.vmware_guest_operations import DATA_INTEGRITY_FILE +if TYPE_CHECKING: + from kubernetes.dynamic import DynamicClient + LOGGER = get_logger(name=__name__) +VM_RUNNING_TIMEOUT = 300 +VM_RUNNING_POLL_INTERVAL = 5 + # Kubernetes resource name limits KUBERNETES_MAX_NAME_LENGTH: int = 63 KUBERNETES_MAX_GENERATE_NAME_PREFIX_LENGTH: int = 58 @@ -1559,3 +1567,55 @@ def check_vms( if failed_checks: failure_details = "; ".join(f"{vm_name}: [{', '.join(errors)}]" for vm_name, errors in failed_checks.items()) pytest.fail(f"VM validation failed — {failure_details}") + + +def verify_vms_running( + ocp_admin_client: "DynamicClient", + prepared_plan: dict[str, Any], + target_namespace: str, +) -> None: + """Assert each VM in the plan is Running in the target namespace. + + Waits for each VM resource to exist, then polls printableStatus until it + reaches Running. VMs typically transition through Provisioning -> Starting + -> Running after migration. The total per-VM budget is VM_RUNNING_TIMEOUT + seconds, shared between the initial wait for the resource to exist and the + subsequent status polling. + + Args: + ocp_admin_client (DynamicClient): OpenShift/Kubernetes client. + prepared_plan (dict[str, Any]): Plan configuration with virtual_machines list. + target_namespace (str): Target namespace to check VMs in. + + Raises: + AssertionError: If any VM does not reach Running status within the timeout. + """ + for vm_config in prepared_plan["virtual_machines"]: + vm = VirtualMachine( + client=ocp_admin_client, + name=vm_config["name"], + namespace=target_namespace, + ) + start = time.monotonic() + vm.wait(timeout=VM_RUNNING_TIMEOUT) + elapsed = time.monotonic() - start + remaining = max(VM_RUNNING_TIMEOUT - elapsed, 10) + LOGGER.info(f"VM {vm.name} resource exists, waiting for Running status") + last_status: str | None = None + try: + for sample in TimeoutSampler( + wait_timeout=remaining, + sleep=VM_RUNNING_POLL_INTERVAL, + func=lambda _vm=vm: _vm.instance.status.printableStatus, + ): + if sample == VirtualMachine.Status.RUNNING: + LOGGER.info(f"VM {vm.name} reached Running status") + break + if sample != last_status: + LOGGER.info(f"VM {vm.name} status: {sample}, waiting for Running") + last_status = sample + except TimeoutExpiredError: + raise AssertionError( + f"VM {vm.name} did not reach Running status within {VM_RUNNING_TIMEOUT}s. " + f"Last observed status: {last_status}" + ) from None