From d8584de9713fe34b8e6fcf431f2c11e0ef7c97ab Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 29 Jan 2026 21:01:29 +0000 Subject: [PATCH 1/2] Add virtual address support to memory segments This change enables SDB integration by allowing memory segments to have both physical and virtual addresses. When drgn loads a vmcore created with kdumpling, it can now read memory by virtual address directly. Changes: - Add virt_addr parameter to add_memory_segment() (defaults to phys_addr) - Add effective_virt_addr property to MemorySegment dataclass - Update _write_elf() to set p_vaddr in PT_LOAD headers - Update DumpStats to include virtual addresses in segment tuples - Add comprehensive tests for virtual address support - Add drgn integration tests for virtual address memory reading https://claude.ai/code/session_018wj4HefWHH9naQ29a4pwr8 --- kdumpling/builder.py | 56 ++++++++-- tests/test_builder.py | 196 ++++++++++++++++++++++++++++++++- tests/test_drgn_integration.py | 67 +++++++++++ 3 files changed, 308 insertions(+), 11 deletions(-) diff --git a/kdumpling/builder.py b/kdumpling/builder.py index de35e3a..16553d5 100644 --- a/kdumpling/builder.py +++ b/kdumpling/builder.py @@ -115,7 +115,7 @@ class DumpStats: total_memory_size: int vmcoreinfo_size: int estimated_file_size: int - memory_segments: list[tuple[int, int]] # List of (phys_addr, size) tuples + memory_segments: list[tuple[int, int, int]] # List of (phys_addr, virt_addr, size) @property def total_memory_size_human(self) -> str: @@ -140,17 +140,31 @@ def __str__(self) -> str: ] if self.memory_segments: lines.append(" Segments:") - for phys_addr, size in self.memory_segments: - lines.append(f" 0x{phys_addr:016x}: {_format_size(size)}") + for phys_addr, virt_addr, size in self.memory_segments: + if phys_addr == virt_addr: + lines.append(f" 0x{phys_addr:016x}: {_format_size(size)}") + else: + lines.append( + f" phys=0x{phys_addr:016x} virt=0x{virt_addr:016x}: " + f"{_format_size(size)}" + ) return "\n".join(lines) @dataclass class MemorySegment: - """Represents a memory segment to be included in the dump.""" + """Represents a memory segment to be included in the dump. + + Attributes: + phys_addr: Physical memory address where this segment resides + data: The memory data (bytes, file path, or file-like object) + virt_addr: Virtual memory address (optional, defaults to phys_addr if None) + size: Size of the segment in bytes (computed automatically) + """ phys_addr: int data: bytes | str | BinaryIO + virt_addr: int | None = None size: int = 0 def __post_init__(self) -> None: @@ -166,6 +180,11 @@ def __post_init__(self) -> None: self.size = self.data.tell() self.data.seek(current_pos) # Restore position + @property + def effective_virt_addr(self) -> int: + """Return the virtual address to use (virt_addr if set, otherwise phys_addr).""" + return self.virt_addr if self.virt_addr is not None else self.phys_addr + def get_data(self) -> bytes: """Read and return the segment data as bytes.""" if isinstance(self.data, bytes): @@ -268,7 +287,10 @@ def set_vmcoreinfo(self, data: str | bytes) -> KdumpBuilder: return self def add_memory_segment( - self, phys_addr: int, data: bytes | str | BinaryIO + self, + phys_addr: int, + data: bytes | str | BinaryIO, + virt_addr: int | None = None, ) -> KdumpBuilder: """ Add a memory segment to the dump. @@ -279,11 +301,25 @@ def add_memory_segment( - bytes: Raw memory content - str: Path to a file containing the data - BinaryIO: File-like object to read from + virt_addr: Optional virtual address for this segment. If not specified, + defaults to the physical address. This is useful for tools + like drgn that need to read memory by virtual address. Returns: self for method chaining + + Example: + # Physical address only (virt_addr defaults to phys_addr) + builder.add_memory_segment(phys_addr=0x100000, data=memory_bytes) + + # With explicit virtual address (for kernel memory mappings) + builder.add_memory_segment( + phys_addr=0x100000, + data=memory_bytes, + virt_addr=0xffff888000100000 + ) """ - segment = MemorySegment(phys_addr=phys_addr, data=data) + segment = MemorySegment(phys_addr=phys_addr, data=data, virt_addr=virt_addr) self._segments.append(segment) return self @@ -503,8 +539,10 @@ def stats(self) -> DumpStats: + total_memory # Memory data ) - # Build segment list - segment_list = [(seg.phys_addr, seg.size) for seg in self._segments] + # Build segment list with (phys_addr, virt_addr, size) + segment_list = [ + (seg.phys_addr, seg.effective_virt_addr, seg.size) for seg in self._segments + ] return DumpStats( architecture=self.arch, @@ -647,7 +685,7 @@ def _write_elf(self, output_path: str) -> None: p_type=PhdrType.PT_LOAD, p_flags=PhdrFlags.PF_R | PhdrFlags.PF_W, p_offset=current_offset, - p_vaddr=0, # Not used for physical memory dumps + p_vaddr=segment.effective_virt_addr, p_paddr=segment.phys_addr, p_filesz=segment.size, p_memsz=segment.size, diff --git a/tests/test_builder.py b/tests/test_builder.py index 7a1f7b3..afa8a24 100644 --- a/tests/test_builder.py +++ b/tests/test_builder.py @@ -522,8 +522,9 @@ def test_stats_with_memory_segments(self) -> None: assert stats.num_memory_segments == 2 assert stats.total_memory_size == 4096 + 8192 assert len(stats.memory_segments) == 2 - assert stats.memory_segments[0] == (0x100000, 4096) - assert stats.memory_segments[1] == (0x200000, 8192) + # Format: (phys_addr, virt_addr, size) - virt_addr defaults to phys_addr + assert stats.memory_segments[0] == (0x100000, 0x100000, 4096) + assert stats.memory_segments[1] == (0x200000, 0x200000, 8192) def test_stats_with_vmcoreinfo(self) -> None: """Test stats with vmcoreinfo.""" @@ -610,3 +611,194 @@ def test_stats_different_architectures(self) -> None: builder = KdumpBuilder(arch=arch) stats = builder.stats assert stats.architecture == arch + + +class TestVirtualAddressSupport: + """Tests for virtual address support in memory segments.""" + + def test_segment_with_explicit_virt_addr(self) -> None: + """Test creating a segment with an explicit virtual address.""" + test_data = b"\xde\xad\xbe\xef" * 1024 + phys_addr = 0x100000 + virt_addr = 0xFFFF888000100000 # Typical kernel direct mapping + + builder = KdumpBuilder(arch="x86_64") + builder.add_memory_segment( + phys_addr=phys_addr, data=test_data, virt_addr=virt_addr + ) + + with tempfile.NamedTemporaryFile(suffix=".vmcore", delete=False) as f: + output_path = f.name + + try: + builder.write(output_path) + + with open(output_path, "rb") as f: + elf = ELFFile(f) + + load_segments = [ + s for s in elf.iter_segments() if s["p_type"] == "PT_LOAD" + ] + assert len(load_segments) == 1 + + load_segment = load_segments[0] + assert load_segment["p_paddr"] == phys_addr + assert load_segment["p_vaddr"] == virt_addr + assert load_segment.data() == test_data + finally: + os.unlink(output_path) + + def test_segment_without_virt_addr_defaults_to_phys(self) -> None: + """Test that segments without virt_addr default to phys_addr.""" + test_data = b"\xca\xfe" * 512 + phys_addr = 0x200000 + + builder = KdumpBuilder(arch="x86_64") + builder.add_memory_segment(phys_addr=phys_addr, data=test_data) + + with tempfile.NamedTemporaryFile(suffix=".vmcore", delete=False) as f: + output_path = f.name + + try: + builder.write(output_path) + + with open(output_path, "rb") as f: + elf = ELFFile(f) + + load_segments = [ + s for s in elf.iter_segments() if s["p_type"] == "PT_LOAD" + ] + assert len(load_segments) == 1 + + load_segment = load_segments[0] + # When virt_addr is not specified, it should default to phys_addr + assert load_segment["p_paddr"] == phys_addr + assert load_segment["p_vaddr"] == phys_addr + finally: + os.unlink(output_path) + + def test_multiple_segments_with_different_virt_addrs(self) -> None: + """Test multiple segments with various virtual address configurations.""" + segments_config = [ + # (phys_addr, virt_addr, data) + (0x100000, 0xFFFF888000100000, b"\x11" * 4096), + (0x200000, None, b"\x22" * 4096), # Should default to phys_addr + (0x300000, 0xFFFFFFFF81300000, b"\x33" * 4096), # Kernel text mapping + ] + + builder = KdumpBuilder(arch="x86_64") + for phys_addr, virt_addr, data in segments_config: + builder.add_memory_segment( + phys_addr=phys_addr, data=data, virt_addr=virt_addr + ) + + with tempfile.NamedTemporaryFile(suffix=".vmcore", delete=False) as f: + output_path = f.name + + try: + builder.write(output_path) + + with open(output_path, "rb") as f: + elf = ELFFile(f) + + load_segments = [ + s for s in elf.iter_segments() if s["p_type"] == "PT_LOAD" + ] + assert len(load_segments) == 3 + + # Verify each segment + for i, load_segment in enumerate(load_segments): + phys_addr, virt_addr, data = segments_config[i] + expected_virt = virt_addr if virt_addr is not None else phys_addr + + assert load_segment["p_paddr"] == phys_addr + assert load_segment["p_vaddr"] == expected_virt + assert load_segment.data() == data + finally: + os.unlink(output_path) + + def test_stats_include_virt_addr(self) -> None: + """Test that stats include virtual addresses for segments.""" + builder = KdumpBuilder(arch="x86_64") + builder.add_memory_segment( + phys_addr=0x100000, data=b"\x00" * 4096, virt_addr=0xFFFF888000100000 + ) + builder.add_memory_segment( + phys_addr=0x200000, data=b"\x00" * 8192 + ) # No virt_addr + + stats = builder.stats + + assert len(stats.memory_segments) == 2 + + # First segment: explicit virt_addr + phys, virt, size = stats.memory_segments[0] + assert phys == 0x100000 + assert virt == 0xFFFF888000100000 + assert size == 4096 + + # Second segment: virt_addr defaults to phys_addr + phys, virt, size = stats.memory_segments[1] + assert phys == 0x200000 + assert virt == 0x200000 # Should default to phys_addr + assert size == 8192 + + def test_stats_string_shows_virt_addr_when_different(self) -> None: + """Test that stats string shows virt_addr when it differs from phys_addr.""" + builder = KdumpBuilder(arch="x86_64") + builder.add_memory_segment( + phys_addr=0x100000, data=b"\x00" * 4096, virt_addr=0xFFFF888000100000 + ) + + stats_str = str(builder.stats) + + # Should show both addresses when they differ + assert "phys=0x" in stats_str + assert "virt=0x" in stats_str + assert "ffff888000100000" in stats_str.lower() + + def test_fluent_api_with_virt_addr(self) -> None: + """Test that the fluent API works with virtual addresses.""" + with tempfile.NamedTemporaryFile(suffix=".vmcore", delete=False) as f: + output_path = f.name + + try: + ( + KdumpBuilder(arch="x86_64") + .set_vmcoreinfo("TEST=1\n") + .add_memory_segment( + phys_addr=0x1000, data=b"\x00" * 100, virt_addr=0xFFFF888000001000 + ) + .add_memory_segment(phys_addr=0x2000, data=b"\xff" * 100) + .write(output_path) + ) + + assert os.path.exists(output_path) + assert os.path.getsize(output_path) > 0 + finally: + os.unlink(output_path) + + def test_virt_addr_with_different_architectures(self) -> None: + """Test virtual address support across different architectures.""" + for arch in ["x86_64", "aarch64", "s390x"]: + builder = KdumpBuilder(arch=arch) + builder.add_memory_segment( + phys_addr=0x100000, data=b"\x00" * 64, virt_addr=0xFFFF000000100000 + ) + + with tempfile.NamedTemporaryFile(suffix=".vmcore", delete=False) as f: + output_path = f.name + + try: + builder.write(output_path) + + with open(output_path, "rb") as f: + elf = ELFFile(f) + load_segments = [ + s for s in elf.iter_segments() if s["p_type"] == "PT_LOAD" + ] + assert len(load_segments) == 1 + assert load_segments[0]["p_vaddr"] == 0xFFFF000000100000 + assert load_segments[0]["p_paddr"] == 0x100000 + finally: + os.unlink(output_path) diff --git a/tests/test_drgn_integration.py b/tests/test_drgn_integration.py index f912a76..2d47ef5 100644 --- a/tests/test_drgn_integration.py +++ b/tests/test_drgn_integration.py @@ -149,3 +149,70 @@ def test_drgn_endianness_detection(self, vmcore_output_path: str) -> None: assert drgn.PlatformFlags.IS_LITTLE_ENDIAN in prog_arm.platform.flags assert drgn.PlatformFlags.IS_64_BIT in prog_arm.platform.flags + + def test_drgn_reads_memory_by_virtual_address( + self, vmcore_output_path: str + ) -> None: + """Test that drgn can read memory using virtual addresses. + + This is critical for SDB integration - SDB records memory by virtual + address and needs drgn to read it back the same way. + """ + # Create test data with recognizable patterns + test_data_1 = b"\xde\xad\xbe\xef" * 1024 # 4KB + test_data_2 = b"\xca\xfe\xba\xbe" * 1024 # 4KB + + # Physical and virtual addresses + phys_addr_1 = 0x100000 + virt_addr_1 = 0xFFFF888000100000 # Typical direct mapping + + phys_addr_2 = 0x200000 + virt_addr_2 = 0xFFFF888000200000 + + builder = KdumpBuilder(arch="x86_64") + builder.set_vmcoreinfo(VMCOREINFO_X86_64) + builder.add_memory_segment( + phys_addr=phys_addr_1, data=test_data_1, virt_addr=virt_addr_1 + ) + builder.add_memory_segment( + phys_addr=phys_addr_2, data=test_data_2, virt_addr=virt_addr_2 + ) + builder.write(vmcore_output_path) + + prog = drgn.Program() + prog.set_core_dump(vmcore_output_path) + + # Read memory by virtual address - this is what SDB replay needs + read_data_1 = prog.read(virt_addr_1, len(test_data_1)) + read_data_2 = prog.read(virt_addr_2, len(test_data_2)) + + assert read_data_1 == test_data_1, "Data read by virtual address should match" + assert read_data_2 == test_data_2, "Data read by virtual address should match" + + # Also verify partial reads work + partial_read = prog.read(virt_addr_1 + 4, 8) + assert partial_read == test_data_1[4:12] + + def test_drgn_reads_memory_virt_addr_defaults_to_phys( + self, vmcore_output_path: str + ) -> None: + """Test that segments without explicit virt_addr can be read by phys_addr. + + When virt_addr is not specified, it defaults to phys_addr, so reading + by the physical address value should work. + """ + test_data = b"\x11\x22\x33\x44" * 512 # 2KB + phys_addr = 0x300000 + + builder = KdumpBuilder(arch="x86_64") + builder.set_vmcoreinfo(VMCOREINFO_X86_64) + # No virt_addr specified - should default to phys_addr + builder.add_memory_segment(phys_addr=phys_addr, data=test_data) + builder.write(vmcore_output_path) + + prog = drgn.Program() + prog.set_core_dump(vmcore_output_path) + + # Read by the address (which is both phys and virt) + read_data = prog.read(phys_addr, len(test_data)) + assert read_data == test_data From e5c676d7579b52b9c2f4f66e72b16acd41edb6fb Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 29 Jan 2026 21:11:19 +0000 Subject: [PATCH 2/2] Add virtual address documentation to quickstart guide Document the new virt_addr parameter for add_memory_segment() with examples showing how to use it with drgn for virtual address reads. https://claude.ai/code/session_018wj4HefWHH9naQ29a4pwr8 --- docs/quickstart.md | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/docs/quickstart.md b/docs/quickstart.md index 2ab2b7a..89d4e4b 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -188,6 +188,48 @@ data = io.BytesIO(b'\xff' * 4096) builder.add_memory_segment(0x3000, data) ``` +## Virtual Address Support + +Memory segments can have both physical and virtual addresses. This is useful when +creating vmcores from tools that capture memory by virtual address (like debuggers +or tracing tools), allowing drgn to read memory by virtual address directly. + +```python +from kdumpling import KdumpBuilder + +builder = KdumpBuilder(arch='x86_64') +builder.set_vmcoreinfo("OSRELEASE=5.14.0\n") + +# Segment with explicit virtual address (kernel direct mapping) +builder.add_memory_segment( + phys_addr=0x100000, + data=memory_data, + virt_addr=0xffff888000100000 +) + +# Segment without virt_addr (defaults to phys_addr) +builder.add_memory_segment(phys_addr=0x200000, data=other_data) + +builder.write("with_vaddr.vmcore") +``` + +When loaded with drgn, memory can be read by virtual address: + +```python +import drgn + +prog = drgn.Program() +prog.set_core_dump("with_vaddr.vmcore") + +# Read using the virtual address +data = prog.read(0xffff888000100000, 4096) +``` + +This is particularly useful for: +- **Debugger integrations**: Tools like SDB that record memory by virtual address +- **Kernel memory captures**: Preserving the kernel's view of memory layout +- **Replay scenarios**: Re-creating the exact virtual address space for analysis + ## Validating with drgn You can verify the generated vmcore using [drgn](https://github.com/osandov/drgn):