From a498bb98ee7e748802e08098c547c6a75145fb88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mathieu=20Dupr=C3=A9?= Date: Tue, 24 Mar 2026 10:50:25 +0100 Subject: [PATCH] Add XML generator to create libvirt domain XML from CLI options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make --xml optional in `vm-mgr create`. When omitted, XML is generated from new CLI flags (--vcpus, --memory, --cpuset, --rt, --hugepages, --balloon, --vnc, --secure-boot, --net, etc.), removing the need for a pre-existing XML file. This unblocks the Cockpit webui to pass individual VM parameters directly via the CLI. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Mathieu Dupré --- tests/test_xml_generator.py | 416 +++++++++++++++++++++++++++++++++++ vm_manager/vm_manager_cmd.py | 101 ++++++++- vm_manager/xml_generator.py | 299 +++++++++++++++++++++++++ 3 files changed, 813 insertions(+), 3 deletions(-) create mode 100644 tests/test_xml_generator.py create mode 100644 vm_manager/xml_generator.py diff --git a/tests/test_xml_generator.py b/tests/test_xml_generator.py new file mode 100644 index 0000000..4eb7924 --- /dev/null +++ b/tests/test_xml_generator.py @@ -0,0 +1,416 @@ +# Copyright (C) 2026 Savoir-faire Linux Inc. +# SPDX-License-Identifier: Apache-2.0 + +import xml.etree.ElementTree as ET + +import pytest + +from vm_manager.xml_generator import ( + generate_xml, + _parse_net_arg, +) + + +def _parse(xml_str): + return ET.fromstring(xml_str) + + +class TestMinimalXml: + def test_domain_type(self): + root = _parse(generate_xml({"name": "testvm"})) + assert root.tag == "domain" + assert root.get("type") == "kvm" + + def test_name(self): + root = _parse(generate_xml({"name": "testvm"})) + assert root.findtext("name") == "testvm" + + def test_no_uuid(self): + root = _parse(generate_xml({"name": "testvm"})) + assert root.find("uuid") is None + + def test_default_vcpus(self): + root = _parse(generate_xml({"name": "testvm"})) + assert root.findtext("vcpu") == "1" + + def test_default_memory(self): + root = _parse(generate_xml({"name": "testvm"})) + assert root.findtext("memory") == "2048" + assert root.find("memory").get("unit") == "MiB" + + def test_devices_present(self): + root = _parse(generate_xml({"name": "testvm"})) + devices = root.find("devices") + assert devices is not None + assert devices.findtext("emulator") == ("/usr/bin/qemu-system-x86_64") + + def test_memballoon_none_by_default(self): + root = _parse(generate_xml({"name": "testvm"})) + mb = root.find("devices/memballoon") + assert mb.get("model") == "none" + + def test_watchdog(self): + root = _parse(generate_xml({"name": "testvm"})) + wd = root.find("devices/watchdog") + assert wd is not None + assert wd.get("model") == "i6300esb" + + def test_os_firmware(self): + root = _parse(generate_xml({"name": "testvm"})) + os_el = root.find("os") + assert os_el.get("firmware") == "efi" + assert os_el.find("type").text == "hvm" + assert os_el.find("type").get("machine") == "q35" + + def test_secure_boot_disabled_by_default(self): + root = _parse(generate_xml({"name": "testvm"})) + feat = root.find("os/firmware/feature") + assert feat.get("name") == "secure-boot" + assert feat.get("enabled") == "no" + + def test_clock(self): + root = _parse(generate_xml({"name": "testvm"})) + clock = root.find("clock") + assert clock.get("offset") == "utc" + + def test_pcie_root(self): + root = _parse(generate_xml({"name": "testvm"})) + ctrl = root.find("devices/controller[@type='pci']") + assert ctrl.get("model") == "pcie-root" + + def test_serial_console(self): + root = _parse(generate_xml({"name": "testvm"})) + assert root.find("devices/serial") is not None + assert root.find("devices/console") is not None + + def test_no_pmu_by_default(self): + root = _parse(generate_xml({"name": "testvm"})) + assert root.find("features/pmu") is None + + def test_cpu_host_model_by_default(self): + root = _parse(generate_xml({"name": "testvm"})) + cpu = root.find("cpu") + assert cpu.get("mode") == "host-model" + + +class TestDescription: + def test_description_set(self): + root = _parse(generate_xml({"name": "vm", "description": "My VM"})) + assert root.findtext("description") == "My VM" + + def test_no_description_by_default(self): + root = _parse(generate_xml({"name": "vm"})) + assert root.find("description") is None + + +class TestRtOptions: + def setup_method(self): + self.root = _parse( + generate_xml( + { + "name": "rtvm", + "rt": True, + "cpuset": [2, 3], + "rt_priority": 5, + "emulatorpin": "0,1", + } + ) + ) + + def test_cpu_mode(self): + cpu = self.root.find("cpu") + assert cpu.get("mode") == "host-passthrough" + + def test_topology(self): + topo = self.root.find("cpu/topology") + assert topo.get("cores") == "2" + assert topo.get("sockets") == "1" + + def test_tsc_deadline(self): + feat = self.root.find("cpu/feature") + assert feat.get("name") == "tsc-deadline" + + def test_pmu_off(self): + pmu = self.root.find("features/pmu") + assert pmu.get("state") == "off" + + def test_vcpupin(self): + pins = self.root.findall("cputune/vcpupin") + assert len(pins) == 2 + assert pins[0].get("cpuset") == "2" + assert pins[1].get("cpuset") == "3" + + def test_vcpusched(self): + scheds = self.root.findall("cputune/vcpusched") + assert len(scheds) == 2 + assert scheds[0].get("scheduler") == "fifo" + assert scheds[0].get("priority") == "5" + + def test_emulatorpin(self): + ep = self.root.find("cputune/emulatorpin") + assert ep.get("cpuset") == "0,1" + + def test_vcpu_count_from_cpuset(self): + assert self.root.findtext("vcpu") == "2" + + +class TestHugepages: + def setup_method(self): + self.root = _parse( + generate_xml( + { + "name": "hpvm", + "hugepages": True, + "vcpus": 2, + } + ) + ) + + def test_memory_gib(self): + assert self.root.find("memory").get("unit") == "GiB" + assert self.root.findtext("memory") == "1" + + def test_memory_backing(self): + mb = self.root.find("memoryBacking") + assert mb is not None + assert mb.find("hugepages/page") is not None + assert mb.find("nosharepages") is not None + + def test_numa_cell(self): + cell = self.root.find("cpu/numa/cell") + assert cell is not None + assert cell.get("cpus") == "0-1" + assert cell.get("memAccess") == "shared" + + +class TestBalloon: + def test_virtio_balloon(self): + root = _parse(generate_xml({"name": "vm", "balloon": True})) + mb = root.find("devices/memballoon") + assert mb.get("model") == "virtio" + assert mb.find("stats").get("period") == "5" + + +class TestVnc: + def setup_method(self): + self.root = _parse(generate_xml({"name": "vm", "vnc": True})) + + def test_graphics(self): + gfx = self.root.find("devices/graphics") + assert gfx.get("type") == "vnc" + assert gfx.get("autoport") == "yes" + + def test_video(self): + video = self.root.find("devices/video/model") + assert video.get("type") == "virtio" + + def test_tablet_input(self): + inp = self.root.find("devices/input") + assert inp.get("type") == "tablet" + assert inp.get("bus") == "usb" + + def test_custom_listen(self): + root = _parse( + generate_xml({"name": "vm", "vnc": True, "vnc_listen": "0.0.0.0"}) + ) + gfx = root.find("devices/graphics") + assert gfx.get("listen") == "0.0.0.0" + + +class TestSecureBoot: + def test_secure_boot_enabled(self): + root = _parse(generate_xml({"name": "vm", "secure_boot": True})) + feat = root.find("os/firmware/feature") + assert feat.get("name") == "secure-boot" + assert feat.get("enabled") == "yes" + + +class TestNetworkBridge: + def test_bridge_interface(self): + root = _parse( + generate_xml( + { + "name": "vm", + "net": ["type=bridge,source=br0," "mac=52:54:00:00:00:01"], + } + ) + ) + iface = root.find("devices/interface[@type='bridge']") + assert iface is not None + assert iface.find("source").get("bridge") == "br0" + assert iface.find("mac").get("address") == "52:54:00:00:00:01" + assert iface.find("model").get("type") == "virtio" + + def test_bridge_with_vlan(self): + root = _parse( + generate_xml( + { + "name": "vm", + "net": [ + "type=bridge,source=br0," + "mac=52:54:00:00:00:01,vlan=100" + ], + } + ) + ) + tag = root.find("devices/interface/vlan/tag") + assert tag.get("id") == "100" + + def test_bridge_with_virtualport(self): + root = _parse( + generate_xml( + { + "name": "vm", + "net": [ + "type=bridge,source=br0," + "mac=52:54:00:00:00:01," + "virtualport=openvswitch" + ], + } + ) + ) + vp = root.find("devices/interface/virtualport") + assert vp.get("type") == "openvswitch" + + +class TestNetworkMacvtap: + def test_macvtap_interface(self): + root = _parse( + generate_xml( + { + "name": "vm", + "net": [ + "type=macvtap,source=eth0," + "mac=52:54:00:00:00:02,mode=bridge" + ], + } + ) + ) + iface = root.find("devices/interface[@type='direct']") + assert iface is not None + assert iface.find("source").get("dev") == "eth0" + assert iface.find("source").get("mode") == "bridge" + + def test_macvtap_trust_guest_rx(self): + root = _parse( + generate_xml( + { + "name": "vm", + "net": [ + "type=macvtap,source=eth0," + "mac=52:54:00:00:00:02," + "trust_guest_rx=yes" + ], + } + ) + ) + iface = root.find("devices/interface[@type='direct']") + assert iface.get("trustGuestRxFilters") == "yes" + + +class TestNetworkPci: + def test_pci_passthrough(self): + root = _parse( + generate_xml( + { + "name": "vm", + "net": ["type=pci,address=0000:03:00.0"], + } + ) + ) + hostdev = root.find("devices/hostdev[@type='pci']") + assert hostdev is not None + addr = hostdev.find("source/address") + assert addr.get("domain") == "0x0000" + assert addr.get("bus") == "0x03" + assert addr.get("slot") == "0x00" + assert addr.get("function") == "0x0" + + +class TestNetworkSriov: + def test_sriov_interface(self): + root = _parse( + generate_xml( + { + "name": "vm", + "net": ["type=sriov,network=sriov-net"], + } + ) + ) + iface = root.find("devices/interface[@type='network']") + assert iface.find("source").get("network") == "sriov-net" + + +class TestNetworkOvs: + def test_ovs_interface(self): + root = _parse( + generate_xml( + { + "name": "vm", + "net": ["type=ovs,mac=52:54:00:00:00:03," "port=vnet0"], + } + ) + ) + iface = root.find("devices/interface[@type='ethernet']") + assert iface is not None + assert iface.find("mac").get("address") == "52:54:00:00:00:03" + assert iface.find("target").get("dev") == "vnet0" + + +class TestNetworkMultiple: + def test_mixed_interfaces(self): + root = _parse( + generate_xml( + { + "name": "vm", + "net": [ + "type=bridge,source=br0," "mac=52:54:00:00:00:01", + "type=sriov,network=sriov-net", + ], + } + ) + ) + ifaces = root.findall("devices/interface") + hostdevs = root.findall("devices/hostdev") + assert len(ifaces) + len(hostdevs) == 2 + + +class TestParseNetArgValid: + def test_bridge(self): + d = _parse_net_arg("type=bridge,source=br0,mac=52:54:00:00:00:01") + assert d["type"] == "bridge" + assert d["source"] == "br0" + + def test_macvtap(self): + d = _parse_net_arg("type=macvtap,source=eth0,mac=52:54:00:00:00:01") + assert d["type"] == "macvtap" + + def test_pci(self): + d = _parse_net_arg("type=pci,address=0000:03:00.0") + assert d["address"] == "0000:03:00.0" + + def test_sriov(self): + d = _parse_net_arg("type=sriov,network=sriov-net") + assert d["network"] == "sriov-net" + + def test_ovs(self): + d = _parse_net_arg("type=ovs,mac=52:54:00:00:00:01,port=vnet0") + assert d["port"] == "vnet0" + + +class TestParseNetArgInvalid: + def test_missing_type(self): + with pytest.raises(ValueError, match="Missing 'type'"): + _parse_net_arg("source=br0,mac=52:54:00:00:00:01") + + def test_unknown_type(self): + with pytest.raises(ValueError, match="Unknown network type"): + _parse_net_arg("type=foobar,source=br0") + + def test_missing_required_key(self): + with pytest.raises(ValueError, match="Missing required key"): + _parse_net_arg("type=bridge,source=br0") + + def test_bad_format(self): + with pytest.raises(ValueError, match="expected key=value"): + _parse_net_arg("type=bridge,noseparator") diff --git a/vm_manager/vm_manager_cmd.py b/vm_manager/vm_manager_cmd.py index 226a727..06c44ed 100755 --- a/vm_manager/vm_manager_cmd.py +++ b/vm_manager/vm_manager_cmd.py @@ -133,7 +133,90 @@ def get_parser(): help="The VM name", ) create_parser.add_argument( - "--xml", type=str, required=True, help="VM libvirt XML path" + "--xml", + type=str, + required=False, + default=None, + help="VM libvirt XML path (optional if generation options are" + " used)", + ) + create_parser.add_argument( + "--vcpus", type=int, default=1, help="Number of vCPUs" + ) + create_parser.add_argument( + "--memory", + type=int, + default=2048, + help="RAM in MiB (default 2048)", + ) + create_parser.add_argument( + "--cpuset", + type=str, + default=None, + help="Comma-separated host CPU list for pinning, " 'e.g. "2,3,4,5"', + ) + create_parser.add_argument( + "--description", + type=str, + default=None, + help="VM description", + ) + create_parser.add_argument( + "--rt", + action="store_true", + default=False, + help="Enable real-time (FIFO scheduling, PMU off, " + "host-passthrough)", + ) + create_parser.add_argument( + "--rt-priority", + type=int, + default=1, + help="RT FIFO priority (default 1)", + ) + create_parser.add_argument( + "--emulatorpin", + type=str, + default=None, + help="Emulator thread CPU pinning", + ) + create_parser.add_argument( + "--hugepages", + action="store_true", + default=False, + help="Enable 1GiB hugepages memory with NUMA", + ) + create_parser.add_argument( + "--balloon", + action="store_true", + default=False, + help="Enable virtio memballoon", + ) + create_parser.add_argument( + "--vnc", + action="store_true", + default=False, + help="Enable VNC graphics", + ) + create_parser.add_argument( + "--vnc-listen", + type=str, + default="127.0.0.1", + help="VNC listen address (default 127.0.0.1)", + ) + create_parser.add_argument( + "--secure-boot", + action="store_true", + default=False, + help="Enable UEFI secure boot", + ) + create_parser.add_argument( + "--net", + type=str, + action="append", + default=None, + help="Network interface (repeatable). Format: " + "type=bridge,source=br0,mac=xx:xx:xx:xx:xx:xx[,vlan=N]", ) stop_parser.add_argument( "-f", @@ -539,8 +622,20 @@ def main(): elif args.command == "remove": vm_manager.remove(args.name) elif args.command == "create": - with open(args.xml, "r") as xml: - args.base_xml = xml.read() + if args.xml: + with open(args.xml, "r") as xml: + args.base_xml = xml.read() + else: + from vm_manager.xml_generator import generate_xml + + gen_opts = vars(args).copy() + if gen_opts.get("cpuset"): + gen_opts["cpuset"] = [ + int(c) for c in gen_opts["cpuset"].split(",") + ] + # argparse already maps --secure-boot → secure_boot, + # --rt-priority → rt_priority, --vnc-listen → vnc_listen + args.base_xml = generate_xml(gen_opts) if "live_migration" in args: args.live_migration = args.enable_live_migration if "add_crm_config_cmd" in args: diff --git a/vm_manager/xml_generator.py b/vm_manager/xml_generator.py new file mode 100644 index 0000000..73b364c --- /dev/null +++ b/vm_manager/xml_generator.py @@ -0,0 +1,299 @@ +# Copyright (C) 2026 Savoir-faire Linux Inc. +# SPDX-License-Identifier: Apache-2.0 + +""" +Generate libvirt domain XML from a dictionary of VM options. + +This replaces the need for a pre-existing XML file when creating VMs, +allowing the CLI (and Cockpit webui) to pass individual parameters. +""" + +import xml.etree.ElementTree as ET + + +def generate_xml(options): + """ + Build a complete ```` XML string. + + :param options: dict of VM options (see plan for keys) + :return: XML string + """ + name = options["name"] + vcpus = options.get("vcpus", 1) + cpuset = options.get("cpuset") + if cpuset: + vcpus = len(cpuset) + memory = options.get("memory", 2048) + description = options.get("description") + rt = options.get("rt", False) + rt_priority = options.get("rt_priority", 1) + emulatorpin = options.get("emulatorpin") + hugepages = options.get("hugepages", False) + balloon = options.get("balloon", False) + vnc = options.get("vnc", False) + vnc_listen = options.get("vnc_listen", "127.0.0.1") + secure_boot = options.get("secure_boot", False) + net_args = options.get("net") or [] + + domain = ET.Element("domain", type="kvm") + + # Name and description + ET.SubElement(domain, "name").text = name + if description: + ET.SubElement(domain, "description").text = description + + # vCPUs + vcpu_el = ET.SubElement(domain, "vcpu", placement="static") + vcpu_el.text = str(vcpus) + + # Memory + if hugepages: + ET.SubElement(domain, "memory", unit="GiB").text = "1" + ET.SubElement(domain, "currentMemory", unit="GiB").text = "1" + mb = ET.SubElement(domain, "memoryBacking") + hp = ET.SubElement(mb, "hugepages") + ET.SubElement(hp, "page", size="1", unit="G") + ET.SubElement(mb, "nosharepages") + else: + ET.SubElement(domain, "memory", unit="MiB").text = str(memory) + ET.SubElement(domain, "currentMemory", unit="MiB").text = str(memory) + + # OS + os_el = ET.SubElement(domain, "os", firmware="efi") + ET.SubElement(os_el, "type", arch="x86_64", machine="q35").text = "hvm" + ET.SubElement(os_el, "boot", dev="hd") + ET.SubElement(os_el, "bootmenu", enable="no") + ET.SubElement(os_el, "bios", useserial="yes", rebootTimeout="0") + ET.SubElement(os_el, "smbios", mode="emulate") + firmware_el = ET.SubElement(os_el, "firmware") + sb_val = "yes" if secure_boot else "no" + ET.SubElement(firmware_el, "feature", enabled=sb_val, name="secure-boot") + + # Features + features = ET.SubElement(domain, "features") + ET.SubElement(features, "acpi") + ET.SubElement(features, "apic") + ET.SubElement(features, "vmport", state="off") + if rt: + ET.SubElement(features, "pmu", state="off") + + # CPU tune + cputune = ET.SubElement(domain, "cputune") + if cpuset: + for i, cpu in enumerate(cpuset): + ET.SubElement(cputune, "vcpupin", vcpu=str(i), cpuset=str(cpu)) + if rt: + ET.SubElement( + cputune, + "vcpusched", + vcpus=str(i), + scheduler="fifo", + priority=str(rt_priority), + ) + if emulatorpin: + ET.SubElement(cputune, "emulatorpin", cpuset=emulatorpin) + + # CPU model + if rt: + cpu_el = ET.SubElement(domain, "cpu", mode="host-passthrough") + ET.SubElement( + cpu_el, + "topology", + sockets="1", + dies="1", + cores=str(vcpus), + threads="1", + ) + ET.SubElement(cpu_el, "feature", policy="require", name="tsc-deadline") + else: + cpu_el = ET.SubElement( + domain, "cpu", mode="host-model", check="partial" + ) + ET.SubElement(cpu_el, "model", fallback="allow") + + # NUMA cell for hugepages + if hugepages: + numa = ET.SubElement(cpu_el, "numa") + cpus_str = "0-{}".format(vcpus - 1) if vcpus > 1 else "0" + ET.SubElement( + numa, + "cell", + id="0", + cpus=cpus_str, + memory="1", + unit="GiB", + memAccess="shared", + ) + + # Clock + clock = ET.SubElement(domain, "clock", offset="utc") + ET.SubElement(clock, "timer", name="rtc", tickpolicy="catchup") + ET.SubElement(clock, "timer", name="pit", tickpolicy="delay") + ET.SubElement(clock, "timer", name="hpet", present="no") + + # Power management + ET.SubElement(domain, "on_poweroff").text = "destroy" + ET.SubElement(domain, "on_reboot").text = "restart" + ET.SubElement(domain, "on_crash").text = "destroy" + pm = ET.SubElement(domain, "pm") + ET.SubElement(pm, "suspend-to-mem", enabled="no") + ET.SubElement(pm, "suspend-to-disk", enabled="no") + + # Devices + devices = ET.SubElement(domain, "devices") + ET.SubElement(devices, "emulator").text = "/usr/bin/qemu-system-x86_64" + + # VNC + if vnc: + gfx = ET.SubElement( + devices, + "graphics", + type="vnc", + port="-1", + autoport="yes", + listen=vnc_listen, + ) + ET.SubElement(gfx, "listen", type="address", address=vnc_listen) + video = ET.SubElement(devices, "video") + ET.SubElement(video, "model", type="virtio", heads="1", primary="yes") + ET.SubElement(devices, "input", type="tablet", bus="usb") + + # Network interfaces + for net_str in net_args: + net_dict = _parse_net_arg(net_str) + _add_net_to_devices(devices, net_dict) + + # Standard controllers / serial / console + ET.SubElement( + devices, "controller", type="pci", index="0", model="pcie-root" + ) + serial = ET.SubElement(devices, "serial", type="pty") + target = ET.SubElement(serial, "target", type="isa-serial", port="0") + ET.SubElement(target, "model", name="isa-serial") + console = ET.SubElement(devices, "console", type="pty") + ET.SubElement(console, "target", type="serial", port="0") + + # Memballoon + if balloon: + mb_el = ET.SubElement(devices, "memballoon", model="virtio") + ET.SubElement(mb_el, "stats", period="5") + else: + ET.SubElement(devices, "memballoon", model="none") + + # Watchdog + ET.SubElement(devices, "watchdog", model="i6300esb", action="poweroff") + + ET.indent(domain) + return ET.tostring(domain, encoding="unicode", xml_declaration=False) + + +def _parse_net_arg(net_str): + """ + Parse a ``--net`` CLI value into a dict. + + Format: ``type=bridge,source=br0,mac=52:54:00:00:00:01,vlan=100`` + + :param net_str: comma-separated key=value string + :return: dict with parsed keys + :raises ValueError: on missing required keys or unknown type + """ + pairs = net_str.split(",") + d = {} + for pair in pairs: + if "=" not in pair: + raise ValueError( + "Invalid net argument '{}': expected key=value".format(pair) + ) + key, value = pair.split("=", 1) + d[key] = value + + net_type = d.get("type") + if not net_type: + raise ValueError("Missing 'type' in --net argument") + + required = { + "bridge": ["source", "mac"], + "macvtap": ["source", "mac"], + "pci": ["address"], + "sriov": ["network"], + "ovs": ["mac", "port"], + } + if net_type not in required: + raise ValueError("Unknown network type '{}'".format(net_type)) + + for key in required[net_type]: + if key not in d: + raise ValueError( + "Missing required key '{}' for network type '{}'".format( + key, net_type + ) + ) + + return d + + +def _add_net_to_devices(devices, net_dict): + """ + Add XML elements for one network interface to ````. + + :param devices: the ```` Element + :param net_dict: parsed dict from :func:`_parse_net_arg` + """ + net_type = net_dict["type"] + + if net_type == "bridge": + iface = ET.SubElement(devices, "interface", type="bridge") + ET.SubElement(iface, "source", bridge=net_dict["source"]) + ET.SubElement(iface, "mac", address=net_dict["mac"]) + ET.SubElement(iface, "model", type="virtio") + if "virtualport" in net_dict: + ET.SubElement(iface, "virtualport", type=net_dict["virtualport"]) + if "vlan" in net_dict: + vlan_el = ET.SubElement(iface, "vlan") + ET.SubElement(vlan_el, "tag", id=net_dict["vlan"]) + + elif net_type == "macvtap": + attrs = {"type": "direct"} + if net_dict.get("trust_guest_rx"): + attrs["trustGuestRxFilters"] = "yes" + iface = ET.SubElement(devices, "interface", **attrs) + src_attrs = {"dev": net_dict["source"]} + if "mode" in net_dict: + src_attrs["mode"] = net_dict["mode"] + ET.SubElement(iface, "source", **src_attrs) + ET.SubElement(iface, "mac", address=net_dict["mac"]) + ET.SubElement(iface, "model", type="virtio") + + elif net_type == "pci": + addr = net_dict["address"] + parts = addr.replace(".", ":").split(":") + if len(parts) != 4: + raise ValueError( + "PCI address must be DDDD:BB:SS.F, got '{}'".format(addr) + ) + hostdev = ET.SubElement( + devices, + "hostdev", + mode="subsystem", + type="pci", + managed="yes", + ) + source = ET.SubElement(hostdev, "source") + ET.SubElement( + source, + "address", + domain="0x" + parts[0], + bus="0x" + parts[1], + slot="0x" + parts[2], + function="0x" + parts[3], + ) + + elif net_type == "sriov": + iface = ET.SubElement(devices, "interface", type="network") + ET.SubElement(iface, "source", network=net_dict["network"]) + + elif net_type == "ovs": + iface = ET.SubElement(devices, "interface", type="ethernet") + ET.SubElement(iface, "mac", address=net_dict["mac"]) + ET.SubElement(iface, "target", dev=net_dict["port"], managed="no") + ET.SubElement(iface, "model", type="virtio")