Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
run: pip install ".[test]"

- name: Run tests
run: sg libvirt -c "pytest tests/ -v --tb=short --ignore=tests/test_vm_manager_cluster.py"
run: sg libvirt -c "pytest tests/ -v --tb=short --ignore=tests/test_vm_manager_cluster.py --ignore=tests/test_vm_manager_cmd_cluster.py"

- name: Install documentation dependencies
run: pip install ".[docs]"
Expand Down
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ Test scripts are in `vm_manager/helpers/tests/pacemaker/` and `vm_manager/helper
- **License**: Apache-2.0 (all source files have copyright headers)
- **Code review**: Gerrit (`.gitreview` → `g1.sfl.team`)
- **VM naming**: alphanumeric only, validated by `_check_name()`
- **Disk naming**: system disks prefixed with `system_` (`OS_DISK_PREFIX` constant)
- **Disk naming**: system disks prefixed with `system_` (`OS_DISK_PREFIX`), additional disks prefixed with `data_` (`DATA_DISK_PREFIX`), e.g. `data_guest0_0`
- **Flake8 config** (`.flake8`): ignores F401 in `__init__.py`, E501 and W503 globally
- **Custom exceptions**: `RbdException`, `PacemakerException`
184 changes: 183 additions & 1 deletion tests/test_vm_manager_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,27 @@ def qcow2_image():
pass


@pytest.fixture
def additional_qcow2_images():
"""Create two small temporary qcow2 images for additional disk testing."""
paths = []
for _ in range(2):
with tempfile.NamedTemporaryFile(suffix=".qcow2", delete=False) as f:
path = f.name
subprocess.run(
["qemu-img", "create", "-f", "qcow2", path, "64M"],
check=True,
capture_output=True,
)
paths.append(path)
yield paths
for p in paths:
try:
os.unlink(p)
except OSError:
pass


@pytest.fixture
def created_vm(vm_name, qcow2_image):
"""Create a VM and return its name. Enables by default."""
Expand All @@ -98,6 +119,23 @@ def created_vm(vm_name, qcow2_image):
return vm_name


@pytest.fixture
def created_vm_with_additional_disks(
vm_name, qcow2_image, additional_qcow2_images
):
"""Create a VM with additional disks and return its name."""
xml = _read_test_xml()
vmc.create(
{
"name": vm_name,
"image": qcow2_image,
"base_xml": xml,
"additional_disks": additional_qcow2_images,
}
)
return vm_name


@pytest.fixture
def disabled_vm(vm_name, qcow2_image):
"""Create a disabled VM and return its name."""
Expand Down Expand Up @@ -521,7 +559,7 @@ def test_purge_invalid_args(self):
with pytest.raises(ValueError, match="not datetime"):
vmc.purge_image("vm", date="2025-01-01")

with pytest.raises(ValueError, match="positive integer"):
with pytest.raises(ValueError, match="non-negative integer"):
vmc.purge_image("vm", number=-1)


Expand Down Expand Up @@ -695,6 +733,150 @@ def test_add_multi_disk_vm_raises(self, vm_name, qcow2_image):
lvm.undefine(vm_name)


# ── additional disks ─────────────────────────────────────────────────


class TestAdditionalDisksCreate:
def test_create_with_additional_disks(
self, created_vm_with_additional_disks
):
assert created_vm_with_additional_disks in vmc.list_vms()

def test_create_with_additional_disks_force(
self,
created_vm_with_additional_disks,
qcow2_image,
additional_qcow2_images,
):
vmc.create(
{
"name": created_vm_with_additional_disks,
"image": qcow2_image,
"base_xml": _read_test_xml(),
"additional_disks": additional_qcow2_images,
"force": True,
}
)
assert created_vm_with_additional_disks in vmc.list_vms()

def test_create_additional_disk_missing_file_raises(
self, vm_name, qcow2_image
):
with pytest.raises(IOError):
vmc.create(
{
"name": vm_name,
"image": qcow2_image,
"base_xml": _read_test_xml(),
"additional_disks": ["/nonexistent/disk.qcow2"],
}
)


class TestAdditionalDisksRemove:
def test_remove_vm_with_additional_disks(
self, vm_name, qcow2_image, additional_qcow2_images
):
vmc.create(
{
"name": vm_name,
"image": qcow2_image,
"base_xml": _read_test_xml(),
"additional_disks": additional_qcow2_images,
}
)
vmc.remove(vm_name)
assert vmc.status(vm_name) == "Undefined"


class TestAdditionalDisksSnapshot:
def test_create_snapshot(self, created_vm_with_additional_disks):
vmc.create_snapshot(created_vm_with_additional_disks, "snap1")
assert "snap1" in vmc.list_snapshots(created_vm_with_additional_disks)

def test_remove_snapshot(self, created_vm_with_additional_disks):
vmc.create_snapshot(created_vm_with_additional_disks, "snap1")
vmc.remove_snapshot(created_vm_with_additional_disks, "snap1")
assert "snap1" not in vmc.list_snapshots(
created_vm_with_additional_disks
)

def test_rollback_snapshot(self, created_vm_with_additional_disks):
vmc.create_snapshot(created_vm_with_additional_disks, "snap1")
vmc.rollback_snapshot(created_vm_with_additional_disks, "snap1")

def test_purge_all_snapshots(self, created_vm_with_additional_disks):
vmc.create_snapshot(created_vm_with_additional_disks, "snap1")
vmc.create_snapshot(created_vm_with_additional_disks, "snap2")
vmc.purge_image(created_vm_with_additional_disks)
assert len(vmc.list_snapshots(created_vm_with_additional_disks)) == 0


class TestAdditionalDisksClone:
def test_clone_vm_with_additional_disks(
self, created_vm_with_additional_disks, second_vm_name
):
vmc.clone(
{
"name": created_vm_with_additional_disks,
"dst_name": second_vm_name,
}
)
assert second_vm_name in vmc.list_vms()
# Verify additional disks were cloned: snapshot operates on all
# disks in the Ceph group — would fail if additional disks are
# missing from the clone's group
vmc.create_snapshot(second_vm_name, "verifysnap")
vmc.remove_snapshot(second_vm_name, "verifysnap")

def test_clone_snapshot_preserved(
self, created_vm_with_additional_disks, second_vm_name
):
vmc.create_snapshot(created_vm_with_additional_disks, "snap1")
vmc.clone(
{
"name": created_vm_with_additional_disks,
"dst_name": second_vm_name,
}
)
assert "snap1" in vmc.list_snapshots(second_vm_name)


class TestAdditionalDisksCreateXml:
def test_additional_disks_in_xml(self):
xml = _read_test_xml()
result = vmc._create_xml(
xml, "myvm", additional_disks=["data_myvm_0", "data_myvm_1"]
)
root = ElementTree.fromstring(result)
disks = root.findall(".//disk[@type='network']")
assert len(disks) == 3 # system + 2 additional

def test_additional_disk_dev_names(self):
xml = _read_test_xml()
result = vmc._create_xml(
xml, "myvm", additional_disks=["data_myvm_0", "data_myvm_1"]
)
root = ElementTree.fromstring(result)
targets = [
d.find("target").get("dev")
for d in root.findall(".//disk[@type='network']")
]
assert targets == ["vda", "vdb", "vdc"]

def test_no_additional_disks(self):
xml = _read_test_xml()
result = vmc._create_xml(xml, "myvm")
root = ElementTree.fromstring(result)
disks = root.findall(".//disk[@type='network']")
assert len(disks) == 1

def test_additional_disk_ceph_source(self):
xml = _read_test_xml()
result = vmc._create_xml(xml, "myvm", additional_disks=["data_myvm_0"])
assert "data_myvm_0" in result


# ── console ──────────────────────────────────────────────────────────


Expand Down
76 changes: 76 additions & 0 deletions tests/test_vm_manager_cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright (C) 2026, Sprecher Automation
# SPDX-License-Identifier: Apache-2.0

"""
Argparse-only tests for the vm_manager_cmd CLI.

These tests exercise the argparse layer in isolation and have no
cluster dependencies. The end-to-end test that drives main() through
the real backend lives in test_vm_manager_cmd_cluster.py (which CI
ignores because it needs Ceph/Pacemaker).
"""

import pytest

import vm_manager
from vm_manager.vm_manager_cmd import get_parser

pytestmark = pytest.mark.skipif(
not vm_manager.cluster_mode,
reason="--additional-disk is only registered in cluster mode",
)


BASE_CREATE_ARGS = [
"create",
"--name",
"vm1",
"--xml",
"/nonexistent/vm.xml",
"-i",
"/nonexistent/sys.qcow2",
]


class TestCreateAdditionalDiskFlag:
def test_single_additional_disk_parses_to_list(self):
parser = get_parser()
args = parser.parse_args(
BASE_CREATE_ARGS + ["--additional-disk", "/nonexistent/a.qcow2"]
)
assert args.additional_disks == ["/nonexistent/a.qcow2"]

def test_multiple_additional_disks_accumulate_in_order(self):
parser = get_parser()
args = parser.parse_args(
BASE_CREATE_ARGS
+ [
"--additional-disk",
"/nonexistent/a.qcow2",
"--additional-disk",
"/nonexistent/b.qcow2",
"--additional-disk",
"/nonexistent/c.qcow2",
]
)
assert args.additional_disks == [
"/nonexistent/a.qcow2",
"/nonexistent/b.qcow2",
"/nonexistent/c.qcow2",
]

def test_omitted_flag_defaults_to_none(self):
parser = get_parser()
args = parser.parse_args(BASE_CREATE_ARGS)
assert args.additional_disks is None

def test_additional_disk_singular_attr_not_used(self):
"""Guard against accidentally dropping dest= — without it,
argparse would derive args.additional_disk (singular) and the
backend would never see the list.
"""
parser = get_parser()
args = parser.parse_args(
BASE_CREATE_ARGS + ["--additional-disk", "/nonexistent/a.qcow2"]
)
assert not hasattr(args, "additional_disk")
Loading
Loading