From d0052ec3522edd9ea5ec22006931209fdb0b8932 Mon Sep 17 00:00:00 2001 From: Martin Wiesner Date: Sat, 17 Jan 2026 20:32:12 +0100 Subject: [PATCH 01/10] #420: Support multiple file Wikipedia dump archives - adds method for handling multi-file dump archives to IDecompressor - adds related impls in BZip2Decompressor, GZipDecompressor and UniversalDecompressor, each relying on SequenceInputStream - WIP -> Tests --- .../decompression/BZip2Decompressor.java | 21 ++++++++ .../decompression/GZipDecompressor.java | 21 ++++++++ .../decompression/IDecompressor.java | 24 +++++++++ .../decompression/SevenZipDecompressor.java | 9 ++++ .../decompression/UniversalDecompressor.java | 51 ++++++++++++++++--- 5 files changed, 119 insertions(+), 7 deletions(-) diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java index e1078c39..7499357b 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java @@ -20,7 +20,11 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Vector; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; @@ -55,4 +59,21 @@ public InputStream getInputStream(Path resource) throws IOException checkResource(resource); return new BZip2CompressorInputStream(new BufferedInputStream(openStream(resource))); } + + /** + * {@inheritDoc} + */ + @Override + public InputStream getInputStreamSequence(List resources) throws IOException { + if (resources == null || resources.isEmpty()) { + throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); + } + resources.forEach(this::checkResource); + // if checks passed for all elements: open streams + List streams = new Vector<>(); + for (Path p: resources) { + streams.add(new BufferedInputStream(openStream(p))); + } + return new BZip2CompressorInputStream(new SequenceInputStream(Collections.enumeration(streams))); + } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java index a4ec8b80..06e3bc61 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java @@ -20,7 +20,11 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Vector; import java.util.zip.GZIPInputStream; /** @@ -54,4 +58,21 @@ public InputStream getInputStream(Path resource) throws IOException return new GZIPInputStream(new BufferedInputStream(openStream(resource))); } + /** + * {@inheritDoc} + */ + @Override + public InputStream getInputStreamSequence(List resources) throws IOException { + if (resources == null || resources.isEmpty()) { + throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); + } + resources.forEach(this::checkResource); + // if checks passed for all elements: open streams + List streams = new Vector<>(); + for (Path p: resources) { + streams.add(new BufferedInputStream(openStream(p))); + } + return new GZIPInputStream(new SequenceInputStream(Collections.enumeration(streams))); + } + } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java index d4da93ce..e21bc75a 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; +import java.util.List; /** * Uses an archive file path and returns an {@link InputStream}. @@ -57,4 +58,27 @@ public interface IDecompressor * @throws IOException Thrown if (other) IO errors occurred. */ InputStream getInputStream(Path resource) throws IOException; + + + /** + * Attempts to open an {@link InputStream} to a compressed archive in a multiple files format. + * These archives a combined over a sequence of files in a logical order, that is, via + * page numbers in ascending order. + *

+ * In this context, external archives are referenced via a relative or absolute path, + * including the actual file names of all multi-file resources. + * In case only a plain file names are given and no directory or path elements are contained + * in {@code resource}, an attempt is made to detect and load the resources from the classpath. + * + * @param resources References an archive via an ordered list of {@link Path paths} of all + * relevant files. Must not be {@code null}, not be {@code empty} and not + * refer to directories. All elements in {@code resources} must not + * be {@code null}. + * @return An open {@link InputStream} for a sequence of resources (multi-file) + * or {@code null} if the archive could not be found. + * + * @throws IllegalArgumentException Thrown if parameter {@code resource} is invalid. + * @throws IOException Thrown if (other) IO errors occurred. + */ + InputStream getInputStreamSequence(List resources) throws IOException; } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressor.java index b37b1ef6..5d366cd3 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressor.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; +import java.util.List; import org.apache.commons.compress.archivers.sevenz.SevenZArchiveEntry; import org.apache.commons.compress.archivers.sevenz.SevenZFile; @@ -86,6 +87,14 @@ public InputStream getInputStream(Path resource) throws IOException { return null; } + /** + * {@inheritDoc} + */ + @Override + public InputStream getInputStreamSequence(List resources) { + throw new UnsupportedOperationException("Not supported yet."); + } + private static class SevenZipInputStreamWrapper extends FilterInputStream { private final SevenZFile archive; diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java index 981bd270..2136a41c 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java @@ -27,6 +27,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -260,12 +261,7 @@ public InputStream getInputStream(String resource) throws IOException { @Override public InputStream getInputStream(Path resource) throws IOException { - if (resource == null || resource.toString().isBlank()) { - throw new IllegalArgumentException("Can't load a 'null' or 'empty' resource!"); - } - if (Files.isDirectory(resource)) { - throw new InvalidPathException(resource.toString(), "Can't load a 'directory' as resource!"); - } + checkPath(resource); final String file = resource.toAbsolutePath().toString(); final String extension = detectExtension(file); @@ -283,7 +279,32 @@ else if (isInternalSupported(extension)) { } /** - * Check if the {@link File} specified via {@code fileName} exists. + * {@inheritDoc} + */ + @Override + public InputStream getInputStreamSequence(List resources) throws IOException { + if (resources == null || resources.isEmpty()) { + throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); + } + resources.forEach(this::checkPath); + final Path firstElement = resources.get(0); // safe here, as resource is not empty + if (firstElement != null) { + final String file = firstElement.toAbsolutePath().toString(); + final String extension = detectExtension(file); + if (isInternalSupported(extension) && !"7z".equals(extension) /* 7z is unsupported for now */) { + return internalSupport.get(extension).getInputStreamSequence(resources); + } + else { + throw new IOException("Multi-file dumps of '" + extension + "' archives " + + "are currently not supported."); + } + } else { + throw new IllegalArgumentException("Can't process a 'null' element in the resources list!"); + } + } + + /** + * Checks if the {@link File} specified via {@code fileName} exists. * * @param resource file path to check * @return {@code true} if the file exists and can be read, {@code false} otherwise. @@ -293,4 +314,20 @@ private boolean fileExists(Path resource) return Files.exists(resource); } + + /** + * Verifies the provided {@code resource} references a valid archive. + * + * @param resource The file's name or (relative) path to read the archive from. + * @throws IllegalArgumentException Thrown if parameters were invalid. + * @throws InvalidPathException Thrown if the parameter {@code resource} referred to a directory. + */ + private void checkPath(Path resource) { + if (resource == null || resource.toString().isBlank()) { + throw new IllegalArgumentException("Can't load a 'null' or 'empty' resource!"); + } + if (Files.isDirectory(resource)) { + throw new InvalidPathException(resource.toString(), "Can't load a 'directory' as resource!"); + } + } } From 9d2df54d9f8ec89cdf958959ead5f1ff6ecb9c6f Mon Sep 17 00:00:00 2001 From: Richard Zowalla Date: Fri, 24 Apr 2026 13:41:03 +0200 Subject: [PATCH 02/10] #420: Fix and test multi-file decompression support - BZip2Decompressor.getInputStreamSequence now enables decompressConcatenated on BZip2CompressorInputStream; without the flag only the first part of a multi-file bz2 archive was decoded. - Close already-opened part streams on failure in BZip2Decompressor and GZipDecompressor to avoid file-descriptor leaks. - UniversalDecompressor.getInputStreamSequence rejects lists whose parts do not share a single archive format, preventing silent misdecoding; also drops an unreachable null-element branch. - Replace the legacy synchronized Vector with ArrayList in the bz2 and gz sequence paths. - Tidy IDecompressor.getInputStreamSequence javadoc (typo, wrong "or null" clause, clearer contract). - Extend AbstractDecompressorTest with common contract tests for getInputStreamSequence (null list, empty list, null element, directory element); SevenZipDecompressorTest overrides these to assert UnsupportedOperationException. - Add multi-part round-trip tests for bz2 and gz that generate fixtures via BZip2CompressorOutputStream / GZIPOutputStream and verify the decompressed stream equals the byte concatenation of all parts. - Add UniversalDecompressor dispatch tests (bz2, gz), rejection of 7z multi-file, unsupported extension, and mixed-extension lists. --- .../decompression/BZip2Decompressor.java | 32 +++++++-- .../decompression/GZipDecompressor.java | 29 ++++++-- .../decompression/IDecompressor.java | 25 +++---- .../decompression/UniversalDecompressor.java | 29 ++++---- .../AbstractDecompressorTest.java | 45 ++++++++++++- .../decompression/BZip2DecompressorTest.java | 44 +++++++++++++ .../decompression/GZipDecompressorTest.java | 44 +++++++++++++ .../SevenZipDecompressorTest.java | 22 +++++++ .../UniversalDecompressorTest.java | 66 +++++++++++++++++++ 9 files changed, 300 insertions(+), 36 deletions(-) diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java index 7499357b..44fab608 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java @@ -22,9 +22,9 @@ import java.io.InputStream; import java.io.SequenceInputStream; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Vector; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; @@ -69,11 +69,31 @@ public InputStream getInputStreamSequence(List resources) throws IOExcepti throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); } resources.forEach(this::checkResource); - // if checks passed for all elements: open streams - List streams = new Vector<>(); - for (Path p: resources) { - streams.add(new BufferedInputStream(openStream(p))); + final List streams = new ArrayList<>(resources.size()); + try { + for (Path p : resources) { + streams.add(new BufferedInputStream(openStream(p))); + } + // decompressConcatenated=true: each part is a self-contained bz2 stream; + // without this flag, only the first part would be decoded. + return new BZip2CompressorInputStream( + new SequenceInputStream(Collections.enumeration(streams)), true); + } catch (IOException | RuntimeException e) { + closeQuietly(streams, e); + throw e; + } + } + + private static void closeQuietly(List streams, Throwable primary) { + for (InputStream s : streams) { + if (s == null) { + continue; + } + try { + s.close(); + } catch (IOException suppressed) { + primary.addSuppressed(suppressed); + } } - return new BZip2CompressorInputStream(new SequenceInputStream(Collections.enumeration(streams))); } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java index 06e3bc61..51a37492 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java @@ -22,9 +22,9 @@ import java.io.InputStream; import java.io.SequenceInputStream; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Vector; import java.util.zip.GZIPInputStream; /** @@ -67,12 +67,29 @@ public InputStream getInputStreamSequence(List resources) throws IOExcepti throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); } resources.forEach(this::checkResource); - // if checks passed for all elements: open streams - List streams = new Vector<>(); - for (Path p: resources) { - streams.add(new BufferedInputStream(openStream(p))); + final List streams = new ArrayList<>(resources.size()); + try { + for (Path p : resources) { + streams.add(new BufferedInputStream(openStream(p))); + } + // GZIPInputStream transparently reads concatenated gzip members (RFC 1952). + return new GZIPInputStream(new SequenceInputStream(Collections.enumeration(streams))); + } catch (IOException | RuntimeException e) { + closeQuietly(streams, e); + throw e; } - return new GZIPInputStream(new SequenceInputStream(Collections.enumeration(streams))); } + private static void closeQuietly(List streams, Throwable primary) { + for (InputStream s : streams) { + if (s == null) { + continue; + } + try { + s.close(); + } catch (IOException suppressed) { + primary.addSuppressed(suppressed); + } + } + } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java index e21bc75a..7f2b9f98 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java @@ -61,24 +61,27 @@ public interface IDecompressor /** - * Attempts to open an {@link InputStream} to a compressed archive in a multiple files format. - * These archives a combined over a sequence of files in a logical order, that is, via - * page numbers in ascending order. + * Attempts to open an {@link InputStream} to a compressed archive that is split + * across multiple files. These archives are combined over a sequence of files + * in a logical order — for example, by page-id ranges in ascending order. + * The returned stream yields the byte concatenation of the decompressed parts + * in the order provided. *

- * In this context, external archives are referenced via a relative or absolute path, - * including the actual file names of all multi-file resources. - * In case only a plain file names are given and no directory or path elements are contained - * in {@code resource}, an attempt is made to detect and load the resources from the classpath. + * External archives are referenced via a relative or absolute path, including + * the actual file name of each resource. In case only plain file names are given + * and no directory or path elements are contained, an attempt is made to detect + * and load the resources from the classpath. * * @param resources References an archive via an ordered list of {@link Path paths} of all * relevant files. Must not be {@code null}, not be {@code empty} and not * refer to directories. All elements in {@code resources} must not * be {@code null}. - * @return An open {@link InputStream} for a sequence of resources (multi-file) - * or {@code null} if the archive could not be found. + * @return An open {@link InputStream} over the concatenated decompressed contents + * of the supplied parts. * - * @throws IllegalArgumentException Thrown if parameter {@code resource} is invalid. - * @throws IOException Thrown if (other) IO errors occurred. + * @throws IllegalArgumentException Thrown if {@code resources} is invalid. + * @throws IOException Thrown if (other) IO errors occurred or the archive format + * does not support multi-file sequences. */ InputStream getInputStreamSequence(List resources) throws IOException; } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java index 2136a41c..7644aa61 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; /** @@ -286,21 +287,25 @@ public InputStream getInputStreamSequence(List resources) throws IOExcepti if (resources == null || resources.isEmpty()) { throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); } + // checkPath rejects null, blank, and directory entries for every element. resources.forEach(this::checkPath); - final Path firstElement = resources.get(0); // safe here, as resource is not empty - if (firstElement != null) { - final String file = firstElement.toAbsolutePath().toString(); - final String extension = detectExtension(file); - if (isInternalSupported(extension) && !"7z".equals(extension) /* 7z is unsupported for now */) { - return internalSupport.get(extension).getInputStreamSequence(resources); + final String extension = detectExtension(resources.get(0).toAbsolutePath().toString()); + // Every part must share the same extension: mixing would otherwise be silently + // misdecoded (first-entry's decoder applied to all bytes). + for (int i = 1; i < resources.size(); i++) { + final String partExtension = detectExtension(resources.get(i).toAbsolutePath().toString()); + if (!Objects.equals(extension, partExtension)) { + throw new IOException("Multi-file dumps must share a single archive format, " + + "got '" + extension + "' and '" + partExtension + "'."); } - else { - throw new IOException("Multi-file dumps of '" + extension + "' archives " + - "are currently not supported."); - } - } else { - throw new IllegalArgumentException("Can't process a 'null' element in the resources list!"); } + // 7z multi-file archives are not supported yet; only the internally supported + // streamable formats (bz2, gz) can be concatenated at the decompressor level. + if (isInternalSupported(extension) && !"7z".equals(extension)) { + return internalSupport.get(extension).getInputStreamSequence(resources); + } + throw new IOException("Multi-file dumps of '" + extension + "' archives " + + "are currently not supported."); } /** diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressorTest.java index 9822c015..71a93704 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressorTest.java @@ -26,6 +26,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.InvalidPathException; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -57,7 +60,47 @@ void testGetInputStreamThrowsWithNull() { void testGetInputStreamThrowsWithDirectory(@TempDir Path input) { assertThrows(InvalidPathException.class, () -> getDecompressor().getInputStream(input)); } - + + /** + * Exception type thrown for contract violations of + * {@link IDecompressor#getInputStreamSequence(List)} — IAE/InvalidPathException + * for formats that support the multi-file contract. Subclasses that don't + * (e.g. 7z) can override to assert {@link UnsupportedOperationException}. + */ + protected Class expectedSequenceValidationException() { + return IllegalArgumentException.class; + } + + protected Class expectedSequenceDirectoryException() { + return InvalidPathException.class; + } + + @Test + void testGetInputStreamSequenceThrowsOnNullList() { + assertThrows(expectedSequenceValidationException(), + () -> getDecompressor().getInputStreamSequence(null)); + } + + @Test + void testGetInputStreamSequenceThrowsOnEmptyList() { + assertThrows(expectedSequenceValidationException(), + () -> getDecompressor().getInputStreamSequence(Collections.emptyList())); + } + + @Test + void testGetInputStreamSequenceThrowsOnNullElement() { + final List withNull = Arrays.asList((Path) null); + assertThrows(expectedSequenceValidationException(), + () -> getDecompressor().getInputStreamSequence(withNull)); + } + + @Test + void testGetInputStreamSequenceThrowsOnDirectoryElement(@TempDir Path dir) { + final List withDir = List.of(dir); + assertThrows(expectedSequenceDirectoryException(), + () -> getDecompressor().getInputStreamSequence(withDir)); + } + protected void getAndCheck(String input) throws IOException { try (InputStream in = getDecompressor().getInputStream(input)) { assertNotNull(in); diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/BZip2DecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/BZip2DecompressorTest.java index 9368575f..5ce18ea6 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/BZip2DecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/BZip2DecompressorTest.java @@ -17,9 +17,21 @@ */ package org.dkpro.jwpl.wikimachine.decompression; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -43,4 +55,36 @@ protected IDecompressor getDecompressor() { void testGetInputStream(String input) throws IOException { getAndCheck(input); } + + @Test + void testGetInputStreamSequenceConcatenatesParts(@TempDir Path dir) throws IOException { + final String contentA = "part-a payload\n"; + final String contentB = "part-b payload\n"; + final Path partA = writeBz2(dir.resolve("dump.xml-p1p10.bz2"), contentA); + final Path partB = writeBz2(dir.resolve("dump.xml-p11p20.bz2"), contentB); + + try (InputStream in = decomp.getInputStreamSequence(List.of(partA, partB))) { + assertNotNull(in); + final String decompressed = new String(in.readAllBytes(), StandardCharsets.UTF_8); + assertEquals(contentA + contentB, decompressed); + } + } + + @Test + void testGetInputStreamSequenceSinglePartEqualsSingleFile(@TempDir Path dir) throws IOException { + final String content = "lonely payload\n"; + final Path part = writeBz2(dir.resolve("dump.xml-p1p10.bz2"), content); + + try (InputStream in = decomp.getInputStreamSequence(List.of(part))) { + assertNotNull(in); + assertEquals(content, new String(in.readAllBytes(), StandardCharsets.UTF_8)); + } + } + + private static Path writeBz2(Path out, String content) throws IOException { + try (OutputStream os = new BZip2CompressorOutputStream(Files.newOutputStream(out))) { + os.write(content.getBytes(StandardCharsets.UTF_8)); + } + return out; + } } diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressorTest.java index 030be11f..041a45d8 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressorTest.java @@ -17,9 +17,21 @@ */ package org.dkpro.jwpl.wikimachine.decompression; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.zip.GZIPOutputStream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -43,4 +55,36 @@ protected IDecompressor getDecompressor() { void testGetInputStream(String input) throws IOException { getAndCheck(input); } + + @Test + void testGetInputStreamSequenceConcatenatesParts(@TempDir Path dir) throws IOException { + final String contentA = "part-a payload\n"; + final String contentB = "part-b payload\n"; + final Path partA = writeGz(dir.resolve("dump.xml-p1p10.gz"), contentA); + final Path partB = writeGz(dir.resolve("dump.xml-p11p20.gz"), contentB); + + try (InputStream in = decomp.getInputStreamSequence(List.of(partA, partB))) { + assertNotNull(in); + final String decompressed = new String(in.readAllBytes(), StandardCharsets.UTF_8); + assertEquals(contentA + contentB, decompressed); + } + } + + @Test + void testGetInputStreamSequenceSinglePartEqualsSingleFile(@TempDir Path dir) throws IOException { + final String content = "lonely payload\n"; + final Path part = writeGz(dir.resolve("dump.xml-p1p10.gz"), content); + + try (InputStream in = decomp.getInputStreamSequence(List.of(part))) { + assertNotNull(in); + assertEquals(content, new String(in.readAllBytes(), StandardCharsets.UTF_8)); + } + } + + private static Path writeGz(Path out, String content) throws IOException { + try (OutputStream os = new GZIPOutputStream(Files.newOutputStream(out))) { + os.write(content.getBytes(StandardCharsets.UTF_8)); + } + return out; + } } diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressorTest.java index e4118ec5..64b3464c 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressorTest.java @@ -23,6 +23,8 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Path; +import java.util.List; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; @@ -45,6 +47,20 @@ protected IDecompressor getDecompressor() { return decomp; } + /** + * 7z does not support multi-file sequences: every invocation fails fast with + * {@link UnsupportedOperationException}, regardless of input validity. + */ + @Override + protected Class expectedSequenceValidationException() { + return UnsupportedOperationException.class; + } + + @Override + protected Class expectedSequenceDirectoryException() { + return UnsupportedOperationException.class; + } + @ParameterizedTest @ValueSource(strings = {"archive.txt.7z", "src/test/resources/archive.txt.7z"}) void testGetInputStream(String input) throws IOException { @@ -62,4 +78,10 @@ void testGetInputStreamWithRandomResourceName() throws IOException { final InputStream in = getDecompressor().getInputStream(UUID.randomUUID().toString()); assertNull(in); } + + @Test + void testGetInputStreamSequenceThrowsUnsupported() { + assertThrows(UnsupportedOperationException.class, + () -> decomp.getInputStreamSequence(List.of(Path.of("archive.txt.7z")))); + } } diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressorTest.java index 60563be7..a70466bd 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressorTest.java @@ -19,17 +19,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.List; +import java.util.zip.GZIPOutputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.api.io.TempDir; @@ -109,4 +115,64 @@ void testGetInputStreamWithExternalConfig(String input) throws IOException { assertEquals(EXPECTED_CONTENT, content); } } + + @Test + void testGetInputStreamSequenceDispatchesBz2() throws IOException { + final String a = "alpha\n"; + final String b = "beta\n"; + final Path partA = writeBz2(tmpDir.resolve("dump.xml-p1p10.bz2"), a); + final Path partB = writeBz2(tmpDir.resolve("dump.xml-p11p20.bz2"), b); + + try (InputStream in = udc.getInputStreamSequence(List.of(partA, partB))) { + assertNotNull(in); + assertEquals(a + b, new String(in.readAllBytes(), StandardCharsets.UTF_8)); + } + } + + @Test + void testGetInputStreamSequenceDispatchesGz() throws IOException { + final String a = "alpha\n"; + final String b = "beta\n"; + final Path partA = writeGz(tmpDir.resolve("dump.xml-p1p10.gz"), a); + final Path partB = writeGz(tmpDir.resolve("dump.xml-p11p20.gz"), b); + + try (InputStream in = udc.getInputStreamSequence(List.of(partA, partB))) { + assertNotNull(in); + assertEquals(a + b, new String(in.readAllBytes(), StandardCharsets.UTF_8)); + } + } + + @Test + void testGetInputStreamSequenceRejects7z() throws IOException { + // Create an empty placeholder so checkPath passes (no file-existence check). + final Path part = Files.createFile(tmpDir.resolve("dump.xml-p1p10.7z")); + assertThrows(IOException.class, () -> udc.getInputStreamSequence(List.of(part))); + } + + @Test + void testGetInputStreamSequenceRejectsUnsupportedExtension() throws IOException { + final Path part = Files.createFile(tmpDir.resolve("dump.xml-p1p10.rar")); + assertThrows(IOException.class, () -> udc.getInputStreamSequence(List.of(part))); + } + + @Test + void testGetInputStreamSequenceRejectsMixedExtensions() throws IOException { + final Path bz2 = writeBz2(tmpDir.resolve("dump.xml-p1p10.bz2"), "x\n"); + final Path gz = writeGz(tmpDir.resolve("dump.xml-p11p20.gz"), "y\n"); + assertThrows(IOException.class, () -> udc.getInputStreamSequence(List.of(bz2, gz))); + } + + private static Path writeBz2(Path out, String content) throws IOException { + try (OutputStream os = new BZip2CompressorOutputStream(Files.newOutputStream(out))) { + os.write(content.getBytes(StandardCharsets.UTF_8)); + } + return out; + } + + private static Path writeGz(Path out, String content) throws IOException { + try (OutputStream os = new GZIPOutputStream(Files.newOutputStream(out))) { + os.write(content.getBytes(StandardCharsets.UTF_8)); + } + return out; + } } From be9357d08b067238bee0c067e66e7117179f54dc Mon Sep 17 00:00:00 2001 From: Richard Zowalla Date: Fri, 24 Apr 2026 13:43:23 +0200 Subject: [PATCH 03/10] #420: Add multi-part XML dump parse plumbing Each Wikimedia multi-file dump part (e.g. pages-articles1.xml-p1p10.bz2) is a standalone XML document with its own root and preamble, so the decompressed byte concatenation is not well-formed XML and cannot be fed to a single SAXParser.parse() call. This commit adds the plumbing to parse such a sequence as one logical document without touching the existing single-stream pipeline. - MultiPartDumpWriter (mwdumper): DumpWriter decorator that forwards writeStartWiki / writeSiteinfo only on the first invocation, swallows per-part writeEndWiki / close, and exposes finish() to emit the single terminal writeEndWiki and close the delegate exactly once. - AbstractXmlDumpReader: split readDump() into a protected doParse() that runs SAX without closing the writer, plus the existing readDump() which calls doParse() followed by writer.close(). The single-stream contract is unchanged. - MultiPartXmlDumpReader (wikimachine): readDumps(parts, writer, factory) iterates parts, instantiates a fresh reader per part via the factory, routes events through MultiPartDumpWriter, and guarantees finish() runs on the failure path as well. - Unit tests for MultiPartDumpWriter (lifecycle collapsing, passthrough, idempotent finish, null-delegate rejection). - Integration tests for MultiPartXmlDumpReader using in-memory XML parts against WikiXMLDumpReader, asserting exact event ordering, null/empty input rejection, and delegate-close on parse failure. Consumers (XML2Binary, DataMachineGenerator, TimeMachineGenerator) are not yet wired up to this API; that is a separate follow-up. --- .../importer/MultiPartDumpWriter.java | 124 ++++++++++ .../importer/MultiPartDumpWriterTest.java | 158 +++++++++++++ .../dump/xml/AbstractXmlDumpReader.java | 15 +- .../dump/xml/MultiPartXmlDumpReader.java | 109 +++++++++ .../dump/xml/MultiPartXmlDumpReaderTest.java | 211 ++++++++++++++++++ 5 files changed, 616 insertions(+), 1 deletion(-) create mode 100644 dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java create mode 100644 dkpro-jwpl-mwdumper/src/test/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriterTest.java create mode 100644 dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java create mode 100644 dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java diff --git a/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java b/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java new file mode 100644 index 00000000..4247c405 --- /dev/null +++ b/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.mwdumper.importer; + +import java.io.IOException; +import java.util.Objects; + +/** + * A {@link DumpWriter} decorator that collapses the per-part lifecycle events of a + * multi-file Wikipedia dump into a single logical document. + *

+ * Wikimedia ships large dumps split across several files (e.g. {@code pages-articles1.xml-p1p10.bz2}, + * {@code pages-articles1.xml-p11p20.bz2}); each file is a self-contained XML document with its own + * {@code } root and {@code } preamble. When those parts are parsed + * sequentially against the same {@link DumpWriter}, the delegate would otherwise receive repeated + * {@link #writeStartWiki()}/{@link #writeEndWiki()}/{@link #writeSiteinfo(Siteinfo)} events. + * This wrapper: + *

    + *
  • forwards {@link #writeStartWiki()} and {@link #writeSiteinfo(Siteinfo)} only on the first + * invocation;
  • + *
  • swallows {@link #writeEndWiki()} and {@link #close()} so that the caller controls the + * true end-of-document via {@link #finish()};
  • + *
  • passes {@link #writeStartPage(Page)}, {@link #writeEndPage()}, and + * {@link #writeRevision(Revision)} through verbatim.
  • + *
+ * Call {@link #finish()} exactly once after all parts have been parsed to emit the single + * {@code writeEndWiki()} and close the delegate. + */ +public final class MultiPartDumpWriter + implements DumpWriter +{ + + private final DumpWriter delegate; + private boolean wikiStarted; + private boolean siteinfoWritten; + private boolean finished; + + public MultiPartDumpWriter(DumpWriter delegate) + { + this.delegate = Objects.requireNonNull(delegate, "delegate must not be null"); + } + + @Override + public void writeStartWiki() throws IOException + { + if (!wikiStarted) { + delegate.writeStartWiki(); + wikiStarted = true; + } + } + + @Override + public void writeEndWiki() + { + // Deferred until finish() — each part emits but we only want one + // logical end-of-wiki event for the combined document. + } + + @Override + public void writeSiteinfo(Siteinfo info) throws IOException + { + if (!siteinfoWritten) { + delegate.writeSiteinfo(info); + siteinfoWritten = true; + } + } + + @Override + public void writeStartPage(Page page) throws IOException + { + delegate.writeStartPage(page); + } + + @Override + public void writeEndPage() throws IOException + { + delegate.writeEndPage(); + } + + @Override + public void writeRevision(Revision revision) throws IOException + { + delegate.writeRevision(revision); + } + + @Override + public void close() + { + // Deferred until finish() so per-part parses can reuse the same underlying writer. + } + + /** + * Emits the final {@code writeEndWiki()} to the delegate (only if at least one + * {@code writeStartWiki()} was observed) and closes it. Idempotent. + * + * @throws IOException Thrown on delegate I/O errors during end-of-wiki or close. + */ + public void finish() throws IOException + { + if (finished) { + return; + } + finished = true; + if (wikiStarted) { + delegate.writeEndWiki(); + } + delegate.close(); + } +} diff --git a/dkpro-jwpl-mwdumper/src/test/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriterTest.java b/dkpro-jwpl-mwdumper/src/test/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriterTest.java new file mode 100644 index 00000000..68e84301 --- /dev/null +++ b/dkpro-jwpl-mwdumper/src/test/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriterTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.mwdumper.importer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +class MultiPartDumpWriterTest +{ + + private static final class RecordingDumpWriter + implements DumpWriter + { + final List events = new ArrayList<>(); + + @Override + public void close() + { + events.add("close"); + } + + @Override + public void writeStartWiki() + { + events.add("startWiki"); + } + + @Override + public void writeEndWiki() + { + events.add("endWiki"); + } + + @Override + public void writeSiteinfo(Siteinfo info) + { + events.add("siteinfo"); + } + + @Override + public void writeStartPage(Page page) + { + events.add("startPage:" + page.Id); + } + + @Override + public void writeEndPage() + { + events.add("endPage"); + } + + @Override + public void writeRevision(Revision revision) + { + events.add("revision:" + revision.Id); + } + } + + @Test + void requiresNonNullDelegate() + { + assertThrows(NullPointerException.class, () -> new MultiPartDumpWriter(null)); + } + + @Test + void collapsesLifecycleAndPassesThroughPageEvents() throws IOException + { + RecordingDumpWriter delegate = new RecordingDumpWriter(); + MultiPartDumpWriter sut = new MultiPartDumpWriter(delegate); + + // Part 1 + sut.writeStartWiki(); + sut.writeSiteinfo(new Siteinfo()); + Page page1 = new Page(); + page1.Id = 1; + sut.writeStartPage(page1); + Revision rev1 = new Revision(); + rev1.Id = 10; + sut.writeRevision(rev1); + sut.writeEndPage(); + sut.writeEndWiki(); // swallowed + sut.close(); // swallowed + + // Part 2 + sut.writeStartWiki(); // collapsed + sut.writeSiteinfo(new Siteinfo()); // collapsed + Page page2 = new Page(); + page2.Id = 2; + sut.writeStartPage(page2); + Revision rev2 = new Revision(); + rev2.Id = 20; + sut.writeRevision(rev2); + sut.writeEndPage(); + sut.writeEndWiki(); // swallowed + sut.close(); // swallowed + + sut.finish(); // emits endWiki + close exactly once + + assertEquals(List.of( + "startWiki", + "siteinfo", + "startPage:1", + "revision:10", + "endPage", + "startPage:2", + "revision:20", + "endPage", + "endWiki", + "close" + ), delegate.events); + } + + @Test + void finishWithoutStartWikiSkipsEndWikiButStillCloses() throws IOException + { + RecordingDumpWriter delegate = new RecordingDumpWriter(); + MultiPartDumpWriter sut = new MultiPartDumpWriter(delegate); + + sut.finish(); + + assertEquals(List.of("close"), delegate.events); + } + + @Test + void finishIsIdempotent() throws IOException + { + RecordingDumpWriter delegate = new RecordingDumpWriter(); + MultiPartDumpWriter sut = new MultiPartDumpWriter(delegate); + + sut.writeStartWiki(); + sut.finish(); + sut.finish(); + sut.finish(); + + assertEquals(List.of("startWiki", "endWiki", "close"), delegate.events); + } +} diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/AbstractXmlDumpReader.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/AbstractXmlDumpReader.java index a40e0746..6a3066c4 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/AbstractXmlDumpReader.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/AbstractXmlDumpReader.java @@ -234,6 +234,20 @@ public AbstractXmlDumpReader(InputStream inputStream, DumpWriter writer) * @throws IOException Thrown if errors occurred during parsing. */ public void readDump() throws IOException + { + doParse(); + writer.close(); + } + + /** + * SAX-parses the bound input stream against this handler, but does not close the + * {@link DumpWriter}. Exposed for multi-part pipelines where several readers share a single + * writer and the caller is responsible for closing it after the last part has been consumed. + * + * @throws IOException Thrown if errors occurred during parsing. + * @see org.dkpro.jwpl.mwdumper.importer.MultiPartDumpWriter + */ + protected void doParse() throws IOException { try { SAXParserFactory factory = SAXParserFactory.newInstance(); @@ -245,7 +259,6 @@ public void readDump() throws IOException catch (ParserConfigurationException | SAXException e) { throw new IOException(e); } - writer.close(); } /** diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java new file mode 100644 index 00000000..d365b367 --- /dev/null +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.wikimachine.dump.xml; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Objects; +import java.util.function.BiFunction; + +import org.dkpro.jwpl.mwdumper.importer.DumpWriter; +import org.dkpro.jwpl.mwdumper.importer.MultiPartDumpWriter; + +/** + * Parses a multi-file Wikipedia dump as a single logical document. + *

+ * Each Wikimedia dump part (e.g. {@code pages-articles1.xml-p1p10.bz2}) is a standalone + * XML document with its own {@code } root and {@code } preamble, so + * the decompressed byte concatenation is not well-formed XML and cannot be fed to a + * single {@link javax.xml.parsers.SAXParser#parse} call. This helper instead parses each + * part with a fresh {@link AbstractXmlDumpReader} while routing all events to one + * {@link MultiPartDumpWriter} — duplicate {@code writeStartWiki}/{@code writeSiteinfo}/ + * {@code writeEndWiki} events across parts are collapsed, and the underlying delegate + * writer only sees one logical document. + */ +public final class MultiPartXmlDumpReader +{ + + /** + * Factory that produces a fresh {@link AbstractXmlDumpReader} (typically a concrete + * subclass such as {@code SimpleXmlDumpReader} or a {@code timemachine} reader) for + * a given part's {@link InputStream} and the shared {@link DumpWriter}. + */ + @FunctionalInterface + public interface ReaderFactory + extends BiFunction + { + } + + private MultiPartXmlDumpReader() + { + // static-only + } + + /** + * Parses every part in {@code parts} against the same {@code writer}. Events from + * the individual parts are funnelled through a {@link MultiPartDumpWriter} so the + * delegate observes the combined stream as a single {@code } document. + * The delegate writer is closed by this method after the last part has been consumed. + * + * @param parts Ordered list of decompressed XML {@link InputStream streams}. Must + * not be {@code null}, must not be empty, and must not contain + * {@code null} elements. + * @param writer The underlying {@link DumpWriter} to flush events into. + * @param factory Instantiates a fresh reader per part — typically a method reference + * such as {@code SimpleXmlDumpReader::new}. + * @throws IOException Thrown on I/O or SAX errors encountered while parsing + * any part. + * @throws IllegalArgumentException If {@code parts} is null, empty, or contains a null + * element. + */ + public static void readDumps(List parts, DumpWriter writer, ReaderFactory factory) + throws IOException + { + if (parts == null || parts.isEmpty()) { + throw new IllegalArgumentException("'parts' must not be null or empty."); + } + Objects.requireNonNull(writer, "'writer' must not be null."); + Objects.requireNonNull(factory, "'factory' must not be null."); + for (int i = 0; i < parts.size(); i++) { + if (parts.get(i) == null) { + throw new IllegalArgumentException("'parts[" + i + "]' is null."); + } + } + + final MultiPartDumpWriter wrapper = new MultiPartDumpWriter(writer); + try { + for (InputStream part : parts) { + final AbstractXmlDumpReader reader = factory.apply(part, wrapper); + reader.doParse(); + } + wrapper.finish(); + } + catch (IOException | RuntimeException e) { + try { + wrapper.finish(); + } + catch (IOException suppressed) { + e.addSuppressed(suppressed); + } + throw e; + } + } +} diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java new file mode 100644 index 00000000..c39538bd --- /dev/null +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.wikimachine.dump.xml; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.dkpro.jwpl.mwdumper.importer.DumpWriter; +import org.dkpro.jwpl.mwdumper.importer.Page; +import org.dkpro.jwpl.mwdumper.importer.Revision; +import org.dkpro.jwpl.mwdumper.importer.Siteinfo; +import org.junit.jupiter.api.Test; + +class MultiPartXmlDumpReaderTest +{ + + private static final String PART_HEADER = + "\n" + + "\n" + + " \n" + + " Test Wiki\n" + + " http://test.example/\n" + + " MediaWiki-test\n" + + " first-letter\n" + + " \n" + + " \n" + + " Talk\n" + + " \n" + + " \n"; + + private static final String PART_FOOTER = "\n"; + + private static String part(int pageId, int revisionId, String title) + { + return PART_HEADER + + " \n" + + " " + title + "\n" + + " " + pageId + "\n" + + " \n" + + " " + revisionId + "\n" + + " 2020-01-01T00:00:00Z\n" + + " \n" + + " Alice\n" + + " 100\n" + + " \n" + + " Body of " + title + "\n" + + " \n" + + " \n" + + PART_FOOTER; + } + + private static InputStream stream(String xml) + { + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + @Test + void parsesTwoPartsAsSingleLogicalDocument() throws IOException + { + final RecordingDumpWriter delegate = new RecordingDumpWriter(); + + MultiPartXmlDumpReader.readDumps( + List.of(stream(part(1, 10, "Page One")), stream(part(2, 20, "Page Two"))), + delegate, + WikiXMLDumpReader::new); + + assertEquals(List.of( + "startWiki", + "siteinfo", + "startPage:1:Page One", + "revision:10", + "endPage", + "startPage:2:Page Two", + "revision:20", + "endPage", + "endWiki", + "close" + ), delegate.events); + } + + @Test + void singlePartBehavesLikeSingleDocument() throws IOException + { + final RecordingDumpWriter delegate = new RecordingDumpWriter(); + + MultiPartXmlDumpReader.readDumps( + List.of(stream(part(7, 77, "Solo"))), + delegate, + WikiXMLDumpReader::new); + + assertEquals(List.of( + "startWiki", + "siteinfo", + "startPage:7:Solo", + "revision:77", + "endPage", + "endWiki", + "close" + ), delegate.events); + } + + @Test + void rejectsNullPartsList() + { + assertThrows(IllegalArgumentException.class, () -> + MultiPartXmlDumpReader.readDumps(null, new RecordingDumpWriter(), WikiXMLDumpReader::new)); + } + + @Test + void rejectsEmptyPartsList() + { + assertThrows(IllegalArgumentException.class, () -> + MultiPartXmlDumpReader.readDumps(Collections.emptyList(), + new RecordingDumpWriter(), WikiXMLDumpReader::new)); + } + + @Test + void rejectsNullElementInPartsList() + { + final List parts = new ArrayList<>(); + parts.add(stream(part(1, 10, "Page One"))); + parts.add(null); + assertThrows(IllegalArgumentException.class, () -> + MultiPartXmlDumpReader.readDumps(parts, new RecordingDumpWriter(), WikiXMLDumpReader::new)); + } + + @Test + void closesDelegateEvenIfParsingFails() + { + final RecordingDumpWriter delegate = new RecordingDumpWriter(); + final InputStream malformed = stream("oops"); + + assertThrows(IOException.class, () -> + MultiPartXmlDumpReader.readDumps(List.of(malformed), delegate, WikiXMLDumpReader::new)); + + // Even on failure, the delegate writer must be closed — nothing else leaked. + assertEquals(List.of("close"), delegate.events); + } + + private static final class RecordingDumpWriter + implements DumpWriter + { + final List events = new ArrayList<>(); + + @Override + public void close() + { + events.add("close"); + } + + @Override + public void writeStartWiki() + { + events.add("startWiki"); + } + + @Override + public void writeEndWiki() + { + events.add("endWiki"); + } + + @Override + public void writeSiteinfo(Siteinfo info) + { + events.add("siteinfo"); + } + + @Override + public void writeStartPage(Page page) + { + events.add("startPage:" + page.Id + ":" + page.Title); + } + + @Override + public void writeEndPage() + { + events.add("endPage"); + } + + @Override + public void writeRevision(Revision revision) + { + events.add("revision:" + revision.Id); + } + } +} From e4bdc5940b2ec1235b49c27a0e42de9affab204e Mon Sep 17 00:00:00 2001 From: Richard Zowalla Date: Fri, 24 Apr 2026 13:53:48 +0200 Subject: [PATCH 04/10] #420: Discover and group multi-part Wikipedia dump files Wikimedia publishes large XML dumps split across several files named -.xml-pp. (e.g. pages-articles1.xml- p1p297012.bz2). The existing DataMachineFiles scanner used .contains("pages-articles.xml") and therefore silently ignored every multi-part file. This commit adds grouping + ordering support without wiring the XML consumers yet. - New util DumpFileDiscovery (wikimachine): recognises the multi-part page-range suffix, matches role names under both single-file and multi-part schemes (rejecting look-alikes such as pages-articles-multistream), and orders a collection of parts by ascending start page id. - DataMachineFiles: internally stores pages-articles and pages-meta-current as ordered List. Legacy singular getters keep returning the first part for backwards compatibility; new getInputPagesArticlesFiles() / getInputPagesMetaCurrentFiles() expose the full ordered list. Role matching uses DumpFileDiscovery and now correctly picks up multi-part names. SQL roles (pagelinks, categorylinks) stay single-file. - TimeMachineFiles: metaHistoryFiles is now a List. Legacy setMetaHistoryFile/getMetaHistoryFile still work (singleton list); new setMetaHistoryFiles/getMetaHistoryFiles accept and expose the ordered list; checkAll() verifies every part is readable. - Tests for DumpFileDiscovery (pattern matching, rejection of look-alikes, stable ordering with ranged and unranged files), DataMachineFiles multi-part grouping, and TimeMachineFiles list setter/getter behaviour and validation. --- .../datamachine/domain/DataMachineFiles.java | 105 ++++++++--- .../domain/DataMachineFilesTest.java | 71 +++++++- .../timemachine/domain/TimeMachineFiles.java | 72 +++++++- .../domain/TimeMachineFilesTest.java | 60 ++++++- .../wikimachine/util/DumpFileDiscovery.java | 163 +++++++++++++++++ .../util/DumpFileDiscoveryTest.java | 167 ++++++++++++++++++ 6 files changed, 606 insertions(+), 32 deletions(-) create mode 100644 dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java create mode 100644 dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscoveryTest.java diff --git a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineFiles.java b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineFiles.java index a9d7cb3f..8a231243 100644 --- a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineFiles.java +++ b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineFiles.java @@ -19,14 +19,25 @@ import java.io.File; import java.io.FileFilter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; import org.dkpro.jwpl.wikimachine.debug.ILogger; import org.dkpro.jwpl.wikimachine.domain.Files; +import org.dkpro.jwpl.wikimachine.util.DumpFileDiscovery; /** * A {@link Files} implementation specific for the DataMachine tool. * It defines file name constants and provides methods for * input/output directory building rules and checks. + *

+ * Wikimedia publishes large XML dumps split across several files (see + * {@link DumpFileDiscovery}). For the {@code pages-articles} and {@code pages-meta-current} + * roles this class keeps the ordered list of parts and exposes both the legacy singular + * getter (first part of the ordered list, for backwards compatibility) and a list getter + * that returns every part. * * @see Files */ @@ -35,8 +46,8 @@ public class DataMachineFiles { private final static String INPUT_PAGELINKS = "pagelinks.sql"; private final static String INPUT_CATEGORYLINKS = "categorylinks.sql"; - private final static String INPUT_PAGESARTICLES = "pages-articles.xml"; - private final static String INPUT_PAGESMETACURRENT = "pages-meta-current.xml"; + private final static String INPUT_PAGESARTICLES = "pages-articles"; + private final static String INPUT_PAGESMETACURRENT = "pages-meta-current"; private final static String GENERATED_PAGE = "page.bin"; private final static String GENERATED_REVISION = "revision.bin"; @@ -48,13 +59,15 @@ public class DataMachineFiles private final static String ARCHIVE_EXTENSION = ".gz"; + private final static Set SUPPORTED_EXTENSIONS = Set.of("bz2", "gz", "7z"); + private File dataDirectory = new File("."); private boolean compressGeneratedFiles = false; private File inputPagelinks = null; - private File inputPagesarticles = null; private File inputCategorylinks = null; - private File inputPagesMetaCurrent = null; + private List inputPagesarticles = new ArrayList<>(); + private List inputPagesMetaCurrent = new ArrayList<>(); /** * Instantiates a {@link Files} object with the specified {@code logger}. @@ -77,9 +90,9 @@ public DataMachineFiles(DataMachineFiles files) super(files); this.dataDirectory = files.dataDirectory; this.inputPagelinks = files.inputPagelinks; - this.inputPagesarticles = files.inputPagesarticles; + this.inputPagesarticles = new ArrayList<>(files.inputPagesarticles); this.inputCategorylinks = files.inputCategorylinks; - this.inputPagesMetaCurrent = files.inputPagesMetaCurrent; + this.inputPagesMetaCurrent = new ArrayList<>(files.inputPagesMetaCurrent); this.compressGeneratedFiles = files.compressGeneratedFiles; } @@ -108,30 +121,34 @@ private boolean checkDataMachineSourceFiles() { final FileFilter supportedFormatFilter = file -> { final String name = file.getName(); - // See UniversalDecompressor for all built-in decompression formats. For now: return name.endsWith(".7z") || name.endsWith(".gz") || name.endsWith(".bz2"); }; final File[] files = dataDirectory.listFiles(supportedFormatFilter); - if (files != null && files.length > 2) { + if (files != null && files.length >= 3) { + final List articleParts = new ArrayList<>(); + final List metaCurrentParts = new ArrayList<>(); for (File currentFile : files) { - String currentFileName = currentFile.getName(); - if (currentFileName.contains(INPUT_PAGESARTICLES)) { - inputPagesarticles = currentFile; + final String name = currentFile.getName(); + if (DumpFileDiscovery.matchesRole(name, INPUT_PAGESARTICLES, SUPPORTED_EXTENSIONS)) { + articleParts.add(currentFile); + } + else if (DumpFileDiscovery.matchesRole(name, INPUT_PAGESMETACURRENT, + SUPPORTED_EXTENSIONS)) { + metaCurrentParts.add(currentFile); } - else if (currentFileName.contains(INPUT_PAGELINKS)) { + else if (name.contains(INPUT_PAGELINKS)) { inputPagelinks = currentFile; } - else if (currentFileName.contains(INPUT_CATEGORYLINKS)) { + else if (name.contains(INPUT_CATEGORYLINKS)) { inputCategorylinks = currentFile; } - else if (currentFileName.contains(INPUT_PAGESMETACURRENT)) { - inputPagesMetaCurrent = currentFile; - } } + inputPagesarticles = DumpFileDiscovery.orderByPageRange(articleParts); + inputPagesMetaCurrent = DumpFileDiscovery.orderByPageRange(metaCurrentParts); } // either inputPagesarticles or inputPagesMetaCurrent have to be placed // in the input directory - return !((inputPagesarticles == null && inputPagesMetaCurrent == null) + return !((inputPagesarticles.isEmpty() && inputPagesMetaCurrent.isEmpty()) || inputPagelinks == null || inputCategorylinks == null); } @@ -179,14 +196,29 @@ public String getInputPageLinks() } /** - * @return Retrieves the absolute path of the {@code pages-articles.xml} file. + * @return Retrieves the absolute path of the first {@code pages-articles.xml} part, + * or {@code null} if none was discovered. For multi-part dumps, prefer + * {@link #getInputPagesArticlesFiles()}. */ public String getInputPagesArticles() { - if (inputPagesarticles == null) { + if (inputPagesarticles.isEmpty()) { checkDataMachineSourceFiles(); } - return inputPagesarticles != null ? inputPagesarticles.getAbsolutePath() : null; + return inputPagesarticles.isEmpty() ? null : inputPagesarticles.get(0).getAbsolutePath(); + } + + /** + * @return Absolute paths of all {@code pages-articles.xml} parts ordered by ascending page + * range. Empty if the dump is not available. A single-file dump yields a list of + * size 1. + */ + public List getInputPagesArticlesFiles() + { + if (inputPagesarticles.isEmpty()) { + checkDataMachineSourceFiles(); + } + return toAbsolutePathList(inputPagesarticles); } /** @@ -201,14 +233,41 @@ public String getInputCategoryLinks() } /** - * @return Retrieves the absolute path of the {@code pages-meta-current.xml} file. + * @return Retrieves the absolute path of the first {@code pages-meta-current.xml} part, + * or {@code null} if none was discovered. For multi-part dumps, prefer + * {@link #getInputPagesMetaCurrentFiles()}. */ public String getInputPagesMetaCurrent() { - if (inputPagesMetaCurrent == null) { + if (inputPagesMetaCurrent.isEmpty()) { + checkDataMachineSourceFiles(); + } + return inputPagesMetaCurrent.isEmpty() ? null + : inputPagesMetaCurrent.get(0).getAbsolutePath(); + } + + /** + * @return Absolute paths of all {@code pages-meta-current.xml} parts ordered by ascending + * page range. Empty if the dump is not available. + */ + public List getInputPagesMetaCurrentFiles() + { + if (inputPagesMetaCurrent.isEmpty()) { checkDataMachineSourceFiles(); } - return inputPagesMetaCurrent != null ? inputPagesMetaCurrent.getAbsolutePath() : null; + return toAbsolutePathList(inputPagesMetaCurrent); + } + + private static List toAbsolutePathList(List files) + { + if (files.isEmpty()) { + return Collections.emptyList(); + } + final List paths = new ArrayList<>(files.size()); + for (File f : files) { + paths.add(f.getAbsolutePath()); + } + return paths; } private String getGeneratedPath(String fileName) diff --git a/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/DataMachineFilesTest.java b/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/DataMachineFilesTest.java index ec8ec719..f9804298 100644 --- a/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/DataMachineFilesTest.java +++ b/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/DataMachineFilesTest.java @@ -28,12 +28,14 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import org.dkpro.jwpl.datamachine.factory.DefaultDataMachineEnvironmentFactory; import org.dkpro.jwpl.wikimachine.factory.IEnvironmentFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; @@ -224,5 +226,72 @@ void testGetGeneratedText(boolean useCompression) { void testGetGeneratedDiscussions() { assertEquals(TEST_OUTPUT_DIR + "discussions.bin", dmFiles.getGeneratedDiscussions()); } - + + @Test + void testGetInputPagesArticlesFilesSingleFile() { + assertEquals(List.of(TEST_OUTPUT_DIR + "pages-articles.xml.bz2"), + dmFiles.getInputPagesArticlesFiles()); + } + + @Test + void testMultiFilePagesArticlesAreGroupedAndOrdered(@TempDir Path dir) throws IOException { + Files.createFile(dir.resolve("dewiki-20260101-pages-articles2.xml-p297013p1262093.bz2")); + Files.createFile(dir.resolve("dewiki-20260101-pages-articles1.xml-p1p297012.bz2")); + Files.createFile(dir.resolve("dewiki-20260101-pages-articles3.xml-p1262094p2762093.bz2")); + Files.createFile(dir.resolve("pagelinks.sql.gz")); + Files.createFile(dir.resolve("categorylinks.sql.gz")); + + DataMachineFiles files = new DataMachineFiles(factory.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + files.checkAll(); + + List parts = files.getInputPagesArticlesFiles(); + assertEquals(3, parts.size()); + assertTrue(parts.get(0).endsWith("pages-articles1.xml-p1p297012.bz2")); + assertTrue(parts.get(1).endsWith("pages-articles2.xml-p297013p1262093.bz2")); + assertTrue(parts.get(2).endsWith("pages-articles3.xml-p1262094p2762093.bz2")); + + // Legacy getter returns the first part for backwards compatibility. + assertTrue(files.getInputPagesArticles().endsWith("pages-articles1.xml-p1p297012.bz2")); + } + + @Test + void testPagesArticlesMultistreamDoesNotMatchPagesArticlesRole(@TempDir Path dir) + throws IOException { + Files.createFile(dir.resolve("dewiki-20260101-pages-articles-multistream.xml.bz2")); + Files.createFile(dir.resolve("dewiki-20260101-pages-articles.xml.bz2")); + Files.createFile(dir.resolve("pagelinks.sql.gz")); + Files.createFile(dir.resolve("categorylinks.sql.gz")); + + DataMachineFiles files = new DataMachineFiles(factory.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + files.checkAll(); + + assertEquals(1, files.getInputPagesArticlesFiles().size()); + assertTrue(files.getInputPagesArticlesFiles().get(0).endsWith("pages-articles.xml.bz2")); + } + + @Test + void testMultiFilePagesMetaCurrent(@TempDir Path dir) throws IOException { + Files.createFile(dir.resolve("enwiki-20250601-pages-meta-current2.xml-p100p200.bz2")); + Files.createFile(dir.resolve("enwiki-20250601-pages-meta-current1.xml-p1p99.bz2")); + Files.createFile(dir.resolve("pagelinks.sql.gz")); + Files.createFile(dir.resolve("categorylinks.sql.gz")); + + DataMachineFiles files = new DataMachineFiles(factory.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + files.checkAll(); + + List parts = files.getInputPagesMetaCurrentFiles(); + assertEquals(2, parts.size()); + assertTrue(parts.get(0).endsWith("pages-meta-current1.xml-p1p99.bz2")); + assertTrue(parts.get(1).endsWith("pages-meta-current2.xml-p100p200.bz2")); + } + + @Test + void testGetInputPagesArticlesFilesEmptyForMissingDump(@TempDir Path dir) { + DataMachineFiles files = new DataMachineFiles(factory.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + assertTrue(files.getInputPagesArticlesFiles().isEmpty()); + } } diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFiles.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFiles.java index 20f64484..13262b5e 100644 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFiles.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFiles.java @@ -19,6 +19,9 @@ import java.io.File; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.dkpro.jwpl.wikimachine.debug.ILogger; import org.dkpro.jwpl.wikimachine.domain.Files; @@ -32,7 +35,7 @@ public class TimeMachineFiles private static final String NO_METAHISTORY = "meta history file not found"; private static final String NO_PAGELINKS = "page links file not found"; - private String metaHistoryFile; + private final List metaHistoryFiles = new ArrayList<>(); private String pageLinksFile; private String categoryLinksFile; private String timeStamp = ""; @@ -45,7 +48,7 @@ public TimeMachineFiles(ILogger logger) public TimeMachineFiles(TimeMachineFiles files) { super(files); - this.metaHistoryFile = files.metaHistoryFile; + this.metaHistoryFiles.addAll(files.metaHistoryFiles); this.pageLinksFile = files.pageLinksFile; this.categoryLinksFile = files.categoryLinksFile; } @@ -60,14 +63,59 @@ public void setTimestamp(Timestamp timestamp) timeStamp = TimestampUtil.toMediaWikiString(timestamp) + File.separator; } + /** + * @return The first (or only) meta-history dump file, or {@code null} if none was configured. + * For multi-part dumps, prefer {@link #getMetaHistoryFiles()}. + */ public String getMetaHistoryFile() { - return metaHistoryFile; + return metaHistoryFiles.isEmpty() ? null : metaHistoryFiles.get(0); } + /** + * Replaces the meta-history configuration with the single given path. + * + * @param metaHistoryFile Absolute or relative path to the meta-history dump. May be + * {@code null} to clear the configuration. + */ public void setMetaHistoryFile(String metaHistoryFile) { - this.metaHistoryFile = metaHistoryFile; + this.metaHistoryFiles.clear(); + if (metaHistoryFile != null) { + this.metaHistoryFiles.add(metaHistoryFile); + } + } + + /** + * @return An unmodifiable view of the ordered meta-history dump parts. A single-file dump + * yields a list of size 1; never {@code null}. + */ + public List getMetaHistoryFiles() + { + return Collections.unmodifiableList(metaHistoryFiles); + } + + /** + * Replaces the meta-history configuration with the given ordered list of parts. The order + * must reflect the ascending page-range order expected by the downstream multi-part XML + * pipeline. + * + * @param metaHistoryFiles Ordered list of part paths. Must not be {@code null} or empty and + * must not contain {@code null} elements. + * @throws IllegalArgumentException If the argument violates the above. + */ + public void setMetaHistoryFiles(List metaHistoryFiles) + { + if (metaHistoryFiles == null || metaHistoryFiles.isEmpty()) { + throw new IllegalArgumentException("'metaHistoryFiles' must not be null or empty."); + } + for (int i = 0; i < metaHistoryFiles.size(); i++) { + if (metaHistoryFiles.get(i) == null) { + throw new IllegalArgumentException("'metaHistoryFiles[" + i + "]' is null."); + } + } + this.metaHistoryFiles.clear(); + this.metaHistoryFiles.addAll(metaHistoryFiles); } public String getPageLinksFile() @@ -111,9 +159,19 @@ protected String getOutputPath(String fileName) @Override public boolean checkAll() { - return checkOutputDirectory() - && checkInputFile(metaHistoryFile, NO_METAHISTORY) - && checkInputFile(pageLinksFile, NO_PAGELINKS) + if (!checkOutputDirectory()) { + return false; + } + if (metaHistoryFiles.isEmpty()) { + logger.log(NO_METAHISTORY); + return false; + } + for (String part : metaHistoryFiles) { + if (!checkInputFile(part, NO_METAHISTORY)) { + return false; + } + } + return checkInputFile(pageLinksFile, NO_PAGELINKS) && checkInputFile(categoryLinksFile, NO_CATEGORYLINKS); } } diff --git a/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFilesTest.java b/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFilesTest.java index 32f3e391..724bdd18 100644 --- a/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFilesTest.java +++ b/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFilesTest.java @@ -18,8 +18,10 @@ package org.dkpro.jwpl.timemachine.domain; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -28,12 +30,16 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.dkpro.jwpl.timemachine.factory.DefaultTimeMachineEnvironmentFactory; import org.dkpro.jwpl.wikimachine.factory.IEnvironmentFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; @@ -173,5 +179,57 @@ void testGetOutputPageRedirects() { void testGetOutputMetadata() { assertEquals(OUTPUT_DIR + File.separator + "MetaData.txt", tmFiles.getOutputMetadata()); } - + + @Test + void testGetMetaHistoryFilesReturnsSingletonForLegacySetter() { + assertEquals(List.of(mockMetaHistory.getAbsolutePath()), tmFiles.getMetaHistoryFiles()); + } + + @Test + void testSetMetaHistoryFileNullClearsList() { + tmFiles.setMetaHistoryFile(null); + assertTrue(tmFiles.getMetaHistoryFiles().isEmpty()); + assertNull(tmFiles.getMetaHistoryFile()); + } + + @Test + void testSetMetaHistoryFilesPreservesOrder(@TempDir Path dir) throws IOException { + Path p1 = Files.createFile( + dir.resolve("enwiki-20250601-pages-meta-history1.xml-p1p812.bz2")); + Path p2 = Files.createFile( + dir.resolve("enwiki-20250601-pages-meta-history1.xml-p813p1418.bz2")); + Path p3 = Files.createFile( + dir.resolve("enwiki-20250601-pages-meta-history2.xml-p1419p2000.bz2")); + + List parts = Arrays.asList(p1.toString(), p2.toString(), p3.toString()); + tmFiles.setMetaHistoryFiles(parts); + + assertEquals(parts, tmFiles.getMetaHistoryFiles()); + assertEquals(p1.toString(), tmFiles.getMetaHistoryFile()); + assertTrue(tmFiles.checkAll()); + } + + @Test + void testSetMetaHistoryFilesRejectsInvalid() { + assertThrows(IllegalArgumentException.class, () -> tmFiles.setMetaHistoryFiles(null)); + assertThrows(IllegalArgumentException.class, + () -> tmFiles.setMetaHistoryFiles(Collections.emptyList())); + assertThrows(IllegalArgumentException.class, + () -> tmFiles.setMetaHistoryFiles(Arrays.asList("a", null))); + } + + @Test + void testGetMetaHistoryFilesIsUnmodifiable() { + assertThrows(UnsupportedOperationException.class, + () -> tmFiles.getMetaHistoryFiles().add("should-fail")); + } + + @Test + void testCheckAllFailsIfAnyMetaHistoryPartMissing(@TempDir Path dir) throws IOException { + Path p1 = Files.createFile(dir.resolve("history1.bz2")); + Path p2 = dir.resolve("history2.bz2"); // not created + + tmFiles.setMetaHistoryFiles(Arrays.asList(p1.toString(), p2.toString())); + assertFalse(tmFiles.checkAll()); + } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java new file mode 100644 index 00000000..1dc7b7c7 --- /dev/null +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.wikimachine.util; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Helpers to recognise and order Wikimedia multi-part dump files. + *

+ * Wikimedia publishes large XML dumps split across several files using the naming scheme + * {@code -.xml-pp.} (for example + * {@code dewiki-20260101-pages-articles1.xml-p1p297012.bz2}, + * {@code dewiki-20260101-pages-articles2.xml-p297013p1262093.bz2}). Older / smaller dumps use the + * single-file scheme {@code -.xml.}. This utility: + *

    + *
  • detects whether a filename is part of a multi-part dump — {@link #hasPageRange};
  • + *
  • matches filenames against a known dump role (e.g. {@code pages-articles}) accepting both + * the single-file and multi-part naming schemes — {@link #matchesRole};
  • + *
  • orders a collection of multi-part files by ascending start page id — + * {@link #orderByPageRange}.
  • + *
+ * All methods tolerate absolute paths: only the file name is inspected. + */ +public final class DumpFileDiscovery +{ + + /** + * Suffix that marks a multi-part dump file; captures the start and end page id of the range. + * Example match on {@code foo-pages-articles1.xml-p297013p1262093.bz2} yields start=297013, + * end=1262093. + */ + private static final Pattern PAGE_RANGE = Pattern.compile("-p(\\d+)p(\\d+)(?=\\.)"); + + private DumpFileDiscovery() + { + // static-only + } + + /** + * @param fileName A file name or path whose last component is inspected. + * @return {@code true} if the name carries the multi-part suffix {@code -pp.}. + */ + public static boolean hasPageRange(String fileName) + { + if (fileName == null) { + return false; + } + return PAGE_RANGE.matcher(lastSegment(fileName)).find(); + } + + /** + * @param file Any {@link File}, absolute or relative. + * @return {@code true} if the file's name carries the multi-part suffix. + */ + public static boolean hasPageRange(File file) + { + return file != null && hasPageRange(file.getName()); + } + + /** + * Matches a filename against a known Wikimedia dump role under either naming scheme: + *
    + *
  • single-file: {@code .xml.}
  • + *
  • multi-part: {@code .xml-pp.}
  • + *
+ * The matcher is anchored on the role substring and the {@code .xml} marker, so similarly + * named dumps such as {@code pages-articles-multistream.xml.bz2} are correctly rejected when + * the requested role is {@code pages-articles}. + * + * @param fileName File name (or path whose last segment is the file name). + * @param role Role token as it appears in the dump name, e.g. {@code pages-articles}, + * {@code pages-meta-current}, {@code pages-meta-history}. + * @param extensions Supported archive extensions without dot, e.g. {@code ["bz2", "gz", "7z"]}. + * @return {@code true} if {@code fileName} matches the role under either scheme. + */ + public static boolean matchesRole(String fileName, String role, Collection extensions) + { + if (fileName == null || role == null || extensions == null || extensions.isEmpty()) { + return false; + } + final String name = lastSegment(fileName); + final String extAlt = String.join("|", extensions); + // Either: ....xml. + // or: ...\d+.xml-pp. + final Pattern p = Pattern.compile( + ".*" + Pattern.quote(role) + "(\\d+\\.xml-p\\d+p\\d+|\\.xml)\\.(" + extAlt + ")$"); + return p.matcher(name).matches(); + } + + /** + * Returns a new list containing {@code files} ordered for multi-part consumption: files with a + * {@code -pp} suffix are sorted by ascending start page id; files without such a + * suffix preserve their relative input order and come first. Stable for equal starts. + * + * @param files Input files in any order. {@code null} elements are rejected. + * @return A new ordered {@link List}. + * @throws IllegalArgumentException If {@code files} is {@code null} or contains a null element. + */ + public static List orderByPageRange(Collection files) + { + if (files == null) { + throw new IllegalArgumentException("'files' must not be null."); + } + final List out = new ArrayList<>(files.size()); + for (File f : files) { + if (f == null) { + throw new IllegalArgumentException("'files' contains a null element."); + } + out.add(f); + } + out.sort(Comparator.comparingLong(DumpFileDiscovery::pageRangeStart)); + return out; + } + + /** + * @return The start page id encoded in the file name's {@code -pp} suffix, or + * {@link Long#MIN_VALUE} if absent. Files without a range therefore sort before + * ranged parts in {@link #orderByPageRange}. + */ + static long pageRangeStart(File file) + { + if (file == null) { + return Long.MIN_VALUE; + } + final Matcher m = PAGE_RANGE.matcher(file.getName()); + if (!m.find()) { + return Long.MIN_VALUE; + } + try { + return Long.parseLong(m.group(1)); + } + catch (NumberFormatException e) { + return Long.MIN_VALUE; + } + } + + private static String lastSegment(String path) + { + final int slash = Math.max(path.lastIndexOf('/'), path.lastIndexOf('\\')); + return slash < 0 ? path : path.substring(slash + 1); + } +} diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscoveryTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscoveryTest.java new file mode 100644 index 00000000..fbadb6ba --- /dev/null +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscoveryTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.wikimachine.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class DumpFileDiscoveryTest +{ + + private static final Set EXTENSIONS = Set.of("bz2", "gz", "7z"); + + // hasPageRange ------------------------------------------------------------- + + @ParameterizedTest + @ValueSource(strings = { + "dewiki-20260101-pages-articles1.xml-p1p297012.bz2", + "enwiki-20250601-pages-meta-history2.xml-p297013p1262093.bz2", + "/tmp/enwiki-20250601-pages-articles1.xml-p1p812.bz2", + "foo-pages-articles27.xml-p0p999999.7z" + }) + void hasPageRangeTrue(String name) + { + assertTrue(DumpFileDiscovery.hasPageRange(name)); + } + + @ParameterizedTest + @ValueSource(strings = { + "dewiki-20260101-pages-articles.xml.bz2", + "pagelinks.sql.gz", + "random.txt", + "pages-articles1.xml-p1.bz2" + }) + void hasPageRangeFalse(String name) + { + assertFalse(DumpFileDiscovery.hasPageRange(name)); + } + + @Test + void hasPageRangeHandlesNull() + { + assertFalse(DumpFileDiscovery.hasPageRange((String) null)); + assertFalse(DumpFileDiscovery.hasPageRange((File) null)); + } + + // matchesRole -------------------------------------------------------------- + + @ParameterizedTest + @ValueSource(strings = { + "dewiki-20260101-pages-articles.xml.bz2", + "enwiki-20250601-pages-articles.xml.gz", + "dewiki-20260101-pages-articles1.xml-p1p297012.bz2", + "dewiki-20260101-pages-articles3.xml-p2762094p3376257.bz2" + }) + void matchesRolePagesArticles(String name) + { + assertTrue(DumpFileDiscovery.matchesRole(name, "pages-articles", EXTENSIONS)); + } + + @ParameterizedTest + @ValueSource(strings = { + "dewiki-20260101-pages-articles-multistream.xml.bz2", + "dewiki-20260101-pages-articles-multistream-index.txt.bz2", + "dewiki-20260101-pages-meta-current.xml.bz2", + "pagelinks.sql.gz", + "dewiki-20260101-pages-articles.xml.rar" + }) + void doesNotMatchRolePagesArticles(String name) + { + assertFalse(DumpFileDiscovery.matchesRole(name, "pages-articles", EXTENSIONS)); + } + + @Test + void matchesRoleMetaCurrent() + { + assertTrue(DumpFileDiscovery.matchesRole( + "enwiki-20250601-pages-meta-current.xml.bz2", "pages-meta-current", EXTENSIONS)); + assertTrue(DumpFileDiscovery.matchesRole( + "enwiki-20250601-pages-meta-current1.xml-p1p100.bz2", + "pages-meta-current", EXTENSIONS)); + } + + @Test + void matchesRoleHandlesNullsAndEmptyExtensions() + { + assertFalse(DumpFileDiscovery.matchesRole(null, "pages-articles", EXTENSIONS)); + assertFalse(DumpFileDiscovery.matchesRole("x.bz2", null, EXTENSIONS)); + assertFalse(DumpFileDiscovery.matchesRole("x.bz2", "pages-articles", null)); + assertFalse(DumpFileDiscovery.matchesRole( + "x.bz2", "pages-articles", Collections.emptySet())); + } + + // orderByPageRange --------------------------------------------------------- + + @Test + void orderByPageRangeSortsByStart() + { + final File a = new File("dewiki-20260101-pages-articles2.xml-p297013p1262093.bz2"); + final File b = new File("dewiki-20260101-pages-articles1.xml-p1p297012.bz2"); + final File c = new File("dewiki-20260101-pages-articles3.xml-p2762094p3376257.bz2"); + final File d = new File("dewiki-20260101-pages-articles3.xml-p1262094p2762093.bz2"); + + final List ordered = DumpFileDiscovery.orderByPageRange(Arrays.asList(a, b, c, d)); + + assertEquals(Arrays.asList(b, a, d, c), ordered); + } + + @Test + void orderByPageRangePutsUnrangedFirstStable() + { + final File single = new File("dewiki-20260101-pages-articles.xml.bz2"); + final File p2 = new File("dewiki-20260101-pages-articles2.xml-p297013p1262093.bz2"); + final File p1 = new File("dewiki-20260101-pages-articles1.xml-p1p297012.bz2"); + + final List ordered = DumpFileDiscovery.orderByPageRange(Arrays.asList(p2, single, p1)); + + assertEquals(Arrays.asList(single, p1, p2), ordered); + } + + @Test + void orderByPageRangeReturnsEmptyForEmptyInput() + { + assertTrue(DumpFileDiscovery.orderByPageRange(Collections.emptyList()).isEmpty()); + } + + @Test + void orderByPageRangeRejectsNullInput() + { + assertThrows(IllegalArgumentException.class, + () -> DumpFileDiscovery.orderByPageRange(null)); + } + + @Test + void orderByPageRangeRejectsNullElement() + { + assertThrows(IllegalArgumentException.class, + () -> DumpFileDiscovery.orderByPageRange(Arrays.asList( + new File("pages-articles1.xml-p1p10.bz2"), null))); + } +} From dfe2ff7cc8d48012724f68c68aa2749d1da9ac38 Mon Sep 17 00:00:00 2001 From: Richard Zowalla Date: Fri, 24 Apr 2026 13:56:49 +0200 Subject: [PATCH 05/10] #420: Decompress multi-part archives per-part, then concatenate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The earlier implementation fed a SequenceInputStream of raw compressed parts to a single decompressor (GZIPInputStream for gz, relying on RFC 1952 multi-member support; BZip2CompressorInputStream with decompressConcatenated=true for bz2). That worked locally but failed on Java 17 / HSQLDB CI runs with decoding stopping after the first part — only one part's content was returned. Root cause: GZIPInputStream detects a subsequent concatenated member by inspecting the underlying stream's available() count after the trailer of the current member. SequenceInputStream's available() returns the *current* underlying stream's count, which is zero at the boundary between parts before the switch to the next part has been triggered by a read(). On timing/buffer-sensitive paths this made GZIPInputStream conclude the stream ended after part one. Fix: wrap each part in its own decompressor (GZIPInputStream / BZip2CompressorInputStream) and concatenate the *decompressed* streams with SequenceInputStream. Identical semantic result, no dependence on compressed-side multi-member detection, and consistent between bz2 and gz. The existing multi-part round-trip tests still pass and the CI-reproducible failure is gone. --- .../wikimachine/decompression/BZip2Decompressor.java | 11 ++++++----- .../wikimachine/decompression/GZipDecompressor.java | 10 +++++++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java index 44fab608..ed479fff 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java @@ -69,15 +69,16 @@ public InputStream getInputStreamSequence(List resources) throws IOExcepti throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); } resources.forEach(this::checkResource); + // Decompress each part independently and concatenate the decompressed streams. + // This mirrors the gzip impl and avoids depending on the compressed-side + // multi-stream detection heuristic (which is sensitive to the underlying + // stream's available() at part boundaries). final List streams = new ArrayList<>(resources.size()); try { for (Path p : resources) { - streams.add(new BufferedInputStream(openStream(p))); + streams.add(new BZip2CompressorInputStream(new BufferedInputStream(openStream(p)))); } - // decompressConcatenated=true: each part is a self-contained bz2 stream; - // without this flag, only the first part would be decoded. - return new BZip2CompressorInputStream( - new SequenceInputStream(Collections.enumeration(streams)), true); + return new SequenceInputStream(Collections.enumeration(streams)); } catch (IOException | RuntimeException e) { closeQuietly(streams, e); throw e; diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java index 51a37492..4972a183 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java @@ -67,13 +67,17 @@ public InputStream getInputStreamSequence(List resources) throws IOExcepti throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); } resources.forEach(this::checkResource); + // Wrap every part in its own GZIPInputStream and concatenate the decompressed streams. + // We previously fed a concatenated compressed stream to a single GZIPInputStream relying + // on RFC 1952 multi-member support, but GZIPInputStream detects subsequent members via + // the underlying stream's available() count — which is zero at SequenceInputStream's + // boundary between parts, so decoding stopped after the first part on some platforms. final List streams = new ArrayList<>(resources.size()); try { for (Path p : resources) { - streams.add(new BufferedInputStream(openStream(p))); + streams.add(new GZIPInputStream(new BufferedInputStream(openStream(p)))); } - // GZIPInputStream transparently reads concatenated gzip members (RFC 1952). - return new GZIPInputStream(new SequenceInputStream(Collections.enumeration(streams))); + return new SequenceInputStream(Collections.enumeration(streams)); } catch (IOException | RuntimeException e) { closeQuietly(streams, e); throw e; From 93ae30e9280754a9d02d2013db8ce06a3724942f Mon Sep 17 00:00:00 2001 From: Richard Zowalla Date: Fri, 24 Apr 2026 14:20:30 +0200 Subject: [PATCH 06/10] #420: Wire DataMachine and TimeMachine pipelines to multi-part dumps Plumbing was in place (DumpFileDiscovery grouping in DataMachineFiles / TimeMachineFiles, MultiPartXmlDumpReader for per-part SAX dispatch with a shared DumpWriter); this commit connects the consumer pipelines so a dump split across several Wikimedia files is ingested as a single logical document. - XML2Binary (datamachine): new XML2Binary(List, DataMachineFiles) constructor routes the list through MultiPartXmlDumpReader.readDumps with SimpleXmlDumpReader::new. The legacy single-stream constructor is unchanged. - DataMachineGenerator.processInputDump: opens one decompressed stream per configured pages-articles / pages-meta-current part (favouring meta-current when present) and hands the list to the multi-part constructor. Single-file dumps collapse to a one-element list with identical semantics to the legacy path. - DumpTableInputStream (wikimachine): adds default initialize(List, DumpTableEnum) that dispatches to the single-stream initializer for size-1 lists and throws UnsupportedOperationException otherwise. Subclasses override when they can read across parts. - XMLDumpTableInputStreamThread (timemachine): new constructor that drives MultiPartXmlDumpReader for a List, selecting the Page/Revision/Text reader per DumpTableEnum. Single-stream mode unchanged. - XMLDumpTableInputStream: overrides the list-based initialize to use the new multi-part thread; the single-stream path is preserved. - TimeMachineGenerator: each of createRevisionParser, createPageParser, and createTextParser now opens one decompressed stream per configured meta-history part via a shared helper and passes the list to DumpTableInputStream.initialize. --- .../domain/DataMachineGenerator.java | 48 +++-- .../jwpl/datamachine/dump/xml/XML2Binary.java | 31 +++- .../domain/XML2BinaryMultiPartTest.java | 157 ++++++++++++++++ .../domain/TimeMachineGenerator.java | 41 +++-- .../dump/xml/XMLDumpTableInputStream.java | 29 ++- .../xml/XMLDumpTableInputStreamThread.java | 98 ++++++++-- .../XMLDumpTableInputStreamMultiPartTest.java | 173 ++++++++++++++++++ .../dump/xml/DumpTableInputStream.java | 25 +++ 8 files changed, 535 insertions(+), 67 deletions(-) create mode 100644 dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/XML2BinaryMultiPartTest.java create mode 100644 dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamMultiPartTest.java diff --git a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineGenerator.java b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineGenerator.java index 4e95ad26..3157f0c4 100644 --- a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineGenerator.java +++ b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineGenerator.java @@ -18,6 +18,9 @@ package org.dkpro.jwpl.datamachine.domain; import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import org.dkpro.jwpl.datamachine.dump.xml.XML2Binary; import org.dkpro.jwpl.wikimachine.domain.AbstractSnapshotGenerator; @@ -85,7 +88,14 @@ private void processInputDump() throws IOException { logger.log("Parsing input dumps..."); - new XML2Binary(decompressor.getInputStream(getPagesArticlesFile()), files); + final List parts = getPagesArticlesFiles(); + final List streams = new ArrayList<>(parts.size()); + for (String part : parts) { + streams.add(decompressor.getInputStream(part)); + } + // A single-file dump reduces to a one-element list; the multi-part XML2Binary + // constructor handles both cases uniformly via MultiPartXmlDumpReader. + new XML2Binary(streams, files); dumpVersionProcessor.setDumpVersions(new IDumpVersion[] { version }); @@ -111,30 +121,28 @@ private void processInputDump() throws IOException } /** - * Parses either {@code pages-articles.xml} or {@code pages-meta-current.xml}. - * If both files exist in the input directory {@code pages-meta-current.xml} will be favored. + * Selects the input articles dump in preferred order: {@code pages-meta-current} (when + * available — includes discussions) falls back to {@code pages-articles}. Returns every + * part of the selected role in ascending page-range order; a single-file dump yields a + * list of size 1. * - * @return the input articles dump + * @return the ordered list of input articles dump parts + * @throws IOException If neither dump role is present. */ - private String getPagesArticlesFile() + private List getPagesArticlesFiles() throws IOException { - String pagesArticlesFile = null; - String parseMessage = null; - - // Use of minimal dump only with articles - if (files.getInputPagesArticles() != null) { - pagesArticlesFile = files.getInputPagesArticles(); - parseMessage = "Discussions are unavailable"; + final List metaCurrent = files.getInputPagesMetaCurrentFiles(); + if (!metaCurrent.isEmpty()) { + logger.log("Discussions are available"); + return metaCurrent; } - - // Use of dump with discussions - if (files.getInputPagesMetaCurrent() != null) { - pagesArticlesFile = files.getInputPagesMetaCurrent(); - parseMessage = "Discussions are available"; + final List articles = files.getInputPagesArticlesFiles(); + if (!articles.isEmpty()) { + logger.log("Discussions are unavailable"); + return articles; } - - logger.log(parseMessage); - return pagesArticlesFile; + throw new IOException("No pages-articles or pages-meta-current dump found in the input " + + "directory."); } private PageParser createPageParser() throws IOException diff --git a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/dump/xml/XML2Binary.java b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/dump/xml/XML2Binary.java index 62a731c2..cfb2cca5 100644 --- a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/dump/xml/XML2Binary.java +++ b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/dump/xml/XML2Binary.java @@ -19,10 +19,13 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; import org.dkpro.jwpl.datamachine.domain.DataMachineFiles; +import org.dkpro.jwpl.mwdumper.importer.DumpWriter; import org.dkpro.jwpl.mwdumper.importer.NamespaceFilter; import org.dkpro.jwpl.mwdumper.importer.XmlDumpReader; +import org.dkpro.jwpl.wikimachine.dump.xml.MultiPartXmlDumpReader; /** * Use org.mediawiki.importer engine to parse the XML-Dump (only useful fields) and store it to @@ -51,16 +54,36 @@ public class XML2Binary */ public XML2Binary(InputStream iStream, DataMachineFiles files) throws IOException { + final DumpWriter writer = new NamespaceFilter(new SimpleBinaryDumpWriter(files), + ENABLED_NAMESPACES); if (USE_MODIFIED_PARSER) { // modified parser, skips faulty tags - new SimpleXmlDumpReader(iStream, - new NamespaceFilter(new SimpleBinaryDumpWriter(files), ENABLED_NAMESPACES)).readDump(); + new SimpleXmlDumpReader(iStream, writer).readDump(); } else { // original MWDumper parser, very sensible to not closed tags - new XmlDumpReader(iStream, - new NamespaceFilter(new SimpleBinaryDumpWriter(files), ENABLED_NAMESPACES)).readDump(); + new XmlDumpReader(iStream, writer).readDump(); } } + /** + * Instantiates an {@link XML2Binary} for a multi-part Wikipedia XML dump. Every stream in + * {@code iStreams} must be a self-contained XML document with its own {@code } + * root; events across parts are collapsed into a single logical document by the underlying + * {@link MultiPartXmlDumpReader}. + * + * @param iStreams Ordered list of XML part streams (ascending page-range). Must not be + * {@code null} or empty; must not contain {@code null} elements. + * @param files The {@link DataMachineFiles} configuration to apply. + * @throws IOException Thrown if IO errors occurred during processing. + */ + public XML2Binary(List iStreams, DataMachineFiles files) throws IOException + { + final DumpWriter writer = new NamespaceFilter(new SimpleBinaryDumpWriter(files), + ENABLED_NAMESPACES); + // The modified parser is always used for multi-part — the original XmlDumpReader is + // only kept as a fallback for its stricter single-document parsing. + MultiPartXmlDumpReader.readDumps(iStreams, writer, SimpleXmlDumpReader::new); + } + } diff --git a/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/XML2BinaryMultiPartTest.java b/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/XML2BinaryMultiPartTest.java new file mode 100644 index 00000000..92b559db --- /dev/null +++ b/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/XML2BinaryMultiPartTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.datamachine.domain; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import org.dkpro.jwpl.datamachine.dump.xml.XML2Binary; +import org.dkpro.jwpl.datamachine.factory.DefaultDataMachineEnvironmentFactory; +import org.dkpro.jwpl.wikimachine.factory.IEnvironmentFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Exercises the multi-part {@link XML2Binary#XML2Binary(java.util.List, DataMachineFiles)} + * constructor end-to-end: two self-contained XML parts fed through the multi-part pipeline + * must produce the same {@code page.bin} / {@code revision.bin} / {@code text.bin} bytes as a + * single-document dump containing the same pages. + */ +class XML2BinaryMultiPartTest +{ + + private static final IEnvironmentFactory FACTORY = + DefaultDataMachineEnvironmentFactory.getInstance(); + + private static final String PART_HEADER = + "\n" + + "\n" + + " \n" + + " Test Wiki\n" + + " http://test.example/\n" + + " MediaWiki-test\n" + + " first-letter\n" + + " \n" + + " \n" + + " Talk\n" + + " \n" + + " \n"; + private static final String PART_FOOTER = "\n"; + + private static String pageBlock(int pageId, int revisionId, String title) + { + return " \n" + + " " + title + "\n" + + " " + pageId + "\n" + + " \n" + + " " + revisionId + "\n" + + " 2020-01-01T00:00:00Z\n" + + " \n" + + " Alice\n" + + " 100\n" + + " \n" + + " Body of " + title + "\n" + + " \n" + + " \n"; + } + + private static InputStream stream(String xml) + { + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static DataMachineFiles filesFor(Path dir) + { + DataMachineFiles files = new DataMachineFiles(FACTORY.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + return files; + } + + @Test + void multiPartProducesSameBinariesAsEquivalentSingleDocument( + @TempDir Path singleDir, @TempDir Path multiDir) throws IOException + { + // Two pages. + final String pageOne = pageBlock(1, 10, "Page One"); + final String pageTwo = pageBlock(2, 20, "Page Two"); + + // Single-document reference: one containing both pages. + final String singleDoc = PART_HEADER + pageOne + pageTwo + PART_FOOTER; + DataMachineFiles singleFiles = filesFor(singleDir); + try (InputStream in = stream(singleDoc)) { + new XML2Binary(in, singleFiles); + } + + // Multi-part input: same two pages as two self-contained XML documents. + final String partA = PART_HEADER + pageOne + PART_FOOTER; + final String partB = PART_HEADER + pageTwo + PART_FOOTER; + DataMachineFiles multiFiles = filesFor(multiDir); + new XML2Binary(List.of(stream(partA), stream(partB)), multiFiles); + + assertBinariesEqual(singleFiles, multiFiles); + } + + @Test + void multiPartWithSingleElementListIsEquivalentToSingleStream( + @TempDir Path singleDir, @TempDir Path multiDir) throws IOException + { + final String doc = PART_HEADER + pageBlock(7, 77, "Solo") + PART_FOOTER; + + DataMachineFiles singleFiles = filesFor(singleDir); + try (InputStream in = stream(doc)) { + new XML2Binary(in, singleFiles); + } + + DataMachineFiles multiFiles = filesFor(multiDir); + new XML2Binary(List.of(stream(doc)), multiFiles); + + assertBinariesEqual(singleFiles, multiFiles); + } + + private static void assertBinariesEqual(DataMachineFiles a, DataMachineFiles b) + throws IOException + { + final Path pageA = Path.of(a.getGeneratedPage()); + final Path pageB = Path.of(b.getGeneratedPage()); + assertTrue(Files.exists(pageA) && Files.size(pageA) > 0, + "page.bin missing or empty (single)"); + assertTrue(Files.exists(pageB) && Files.size(pageB) > 0, + "page.bin missing or empty (multi)"); + assertArrayEquals(Files.readAllBytes(pageA), Files.readAllBytes(pageB), + "page.bin mismatch between multi-part and equivalent single document"); + + final Path revA = Path.of(a.getGeneratedRevision()); + final Path revB = Path.of(b.getGeneratedRevision()); + assertArrayEquals(Files.readAllBytes(revA), Files.readAllBytes(revB), + "revision.bin mismatch"); + + final Path textA = Path.of(a.getGeneratedText()); + final Path textB = Path.of(b.getGeneratedText()); + assertArrayEquals(Files.readAllBytes(textA), Files.readAllBytes(textB), + "text.bin mismatch"); + } +} diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineGenerator.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineGenerator.java index 35193987..ed2e6951 100755 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineGenerator.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineGenerator.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.InputStream; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; import org.dkpro.jwpl.wikimachine.domain.AbstractSnapshotGenerator; import org.dkpro.jwpl.wikimachine.domain.Files; @@ -138,35 +140,24 @@ private void processInputDumps() throws IOException private RevisionParser createRevisionParser() throws IOException { - - String metahistory = initialFiles.getMetaHistoryFile(); - - DumpTableInputStream revisionTableInputStream = envFactory - .getDumpTableInputStream(); - revisionTableInputStream.initialize(decompressor.getInputStream(metahistory), - DumpTableEnum.REVISION); + DumpTableInputStream revisionTableInputStream = envFactory.getDumpTableInputStream(); + revisionTableInputStream.initialize(openMetaHistoryStreams(), DumpTableEnum.REVISION); RevisionParser revisionParser = envFactory.getRevisionParser(); revisionParser.setInputStream(revisionTableInputStream); return revisionParser; - } private PageParser createPageParser() throws IOException { - - String metahistory = initialFiles.getMetaHistoryFile(); - DumpTableInputStream pageTableInputStream = envFactory.getDumpTableInputStream(); - pageTableInputStream.initialize(decompressor.getInputStream(metahistory), - DumpTableEnum.PAGE); + pageTableInputStream.initialize(openMetaHistoryStreams(), DumpTableEnum.PAGE); PageParser pageParser = envFactory.getPageParser(); pageParser.setInputStream(pageTableInputStream); return pageParser; - } private CategorylinksParser createCategorylinksParser() throws IOException @@ -191,18 +182,28 @@ private PagelinksParser createPagelinksParser() throws IOException private TextParser createTextParser() throws IOException { - - String metahistory = initialFiles.getMetaHistoryFile(); - DumpTableInputStream textTableInputStream = envFactory.getDumpTableInputStream(); - textTableInputStream.initialize(decompressor.getInputStream(metahistory), - DumpTableEnum.TEXT); + textTableInputStream.initialize(openMetaHistoryStreams(), DumpTableEnum.TEXT); TextParser textParser = envFactory.getTextParser(); textParser.setInputStream(textTableInputStream); return textParser; - } + /** + * Opens a decompressed stream per configured meta-history part, preserving order. A + * single-file dump yields a list of size 1; the call site hands the list to + * {@link DumpTableInputStream#initialize(List, DumpTableEnum)} which dispatches to the + * single- or multi-part SAX pipeline transparently. + */ + private List openMetaHistoryStreams() throws IOException + { + final List parts = initialFiles.getMetaHistoryFiles(); + final List streams = new ArrayList<>(parts.size()); + for (String part : parts) { + streams.add(decompressor.getInputStream(part)); + } + return streams; + } } diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStream.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStream.java index 29141a14..57e1a2c7 100755 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStream.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStream.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.util.List; import org.dkpro.jwpl.wikimachine.dump.xml.DumpTableEnum; import org.dkpro.jwpl.wikimachine.dump.xml.DumpTableInputStream; @@ -56,7 +57,30 @@ public class XMLDumpTableInputStream @Override public void initialize(InputStream inputStream, DumpTableEnum table) throws IOException { + final PipedOutputStream decodedStream = openPipe(); + xmlInputThread = new XMLDumpTableInputStreamThread(inputStream, decodedStream, table); + xmlInputThread.start(); + } + /** + * Multi-part equivalent of {@link #initialize(InputStream, DumpTableEnum)}. Each element of + * {@code inputStreams} is a self-contained Wikipedia XML dump part; SAX events across parts + * are collapsed into a single logical document before being written to the SQL sink. + * + * @param inputStreams Ordered list of XML part streams (ascending page-range). Must not be + * {@code null} or empty and must not contain {@code null} elements. + * @param table The type of table to dump. + * @throws IOException Thrown if IO errors occurred while setting up the pipe. + */ + public void initialize(List inputStreams, DumpTableEnum table) throws IOException + { + final PipedOutputStream decodedStream = openPipe(); + xmlInputThread = new XMLDumpTableInputStreamThread(inputStreams, decodedStream, table); + xmlInputThread.start(); + } + + private PipedOutputStream openPipe() throws IOException + { /* * piped input stream, that allows to read from a decodedStream */ @@ -67,10 +91,7 @@ public void initialize(InputStream inputStream, DumpTableEnum table) throws IOEx */ PipedOutputStream decodedStream = new PipedOutputStream(unbufferedResult); result = new BufferedInputStream(unbufferedResult, BUFFERSIZE); - - xmlInputThread = new XMLDumpTableInputStreamThread(inputStream, decodedStream, table); - xmlInputThread.start(); - + return decodedStream; } @Override diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java index c45e0146..b8fc58f6 100755 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java @@ -21,10 +21,13 @@ import java.io.InputStream; import java.io.OutputStream; import java.lang.invoke.MethodHandles; +import java.util.List; +import org.dkpro.jwpl.mwdumper.importer.DumpWriter; import org.dkpro.jwpl.mwdumper.importer.NamespaceFilter; import org.dkpro.jwpl.wikimachine.dump.xml.AbstractXmlDumpReader; import org.dkpro.jwpl.wikimachine.dump.xml.DumpTableEnum; +import org.dkpro.jwpl.wikimachine.dump.xml.MultiPartXmlDumpReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,40 +52,87 @@ class XMLDumpTableInputStreamThread */ private AbstractXmlDumpReader xmlReader; + /** + * Populated in multi-part mode. When non-null, {@link #run()} drives parsing through + * {@link MultiPartXmlDumpReader} instead of a single {@link AbstractXmlDumpReader#readDump()}. + */ + private final List parts; + private final DumpWriter multiPartWriter; + private final MultiPartXmlDumpReader.ReaderFactory readerFactory; + /** * completion flag for a conversion process */ private boolean isComplete; /** - * Initiate input and output streams + * Initiate input and output streams for a single-file dump. * - * @param iStream - * XML input stream - * @param oStream - * SQL output stream - * @throws IOException - * Thrown in case errors occurred. + * @param iStream XML input stream + * @param oStream SQL output stream + * @param table Kind of table output expected. + * @throws IOException Thrown in case errors occurred. */ public XMLDumpTableInputStreamThread(InputStream iStream, OutputStream oStream, DumpTableEnum table) throws IOException { super("xml2sql"); + this.parts = null; + this.multiPartWriter = null; + this.readerFactory = null; + this.xmlReader = createReader(iStream, createWriter(oStream, table), table); + } + /** + * Initiate input and output streams for a multi-part dump. Every element of {@code iStreams} + * is a self-contained XML document; SAX events across parts are collapsed into a single + * logical document by {@link MultiPartXmlDumpReader}. + * + * @param iStreams Ordered list of XML part input streams (ascending page-range). + * @param oStream SQL output stream. + * @param table Kind of table output expected. + */ + public XMLDumpTableInputStreamThread(List iStreams, OutputStream oStream, + DumpTableEnum table) + { + super("xml2sql"); + this.parts = iStreams; + this.multiPartWriter = createWriter(oStream, table); + this.readerFactory = readerFactoryFor(table); + } + + private static DumpWriter createWriter(OutputStream oStream, DumpTableEnum table) + { + switch (table) { + case PAGE: + return new NamespaceFilter(new PageWriter(oStream), ENABLED_NAMESPACES); + case REVISION: + return new NamespaceFilter(new RevisionWriter(oStream), ENABLED_NAMESPACES); + case TEXT: + return new NamespaceFilter(new TextWriter(oStream), ENABLED_NAMESPACES); + default: + throw new IllegalArgumentException("Unsupported table type: " + table); + } + } + + private static AbstractXmlDumpReader createReader(InputStream in, DumpWriter writer, + DumpTableEnum table) + { + return readerFactoryFor(table).apply(in, writer); + } + + private static MultiPartXmlDumpReader.ReaderFactory readerFactoryFor(DumpTableEnum table) + { switch (table) { case PAGE: - xmlReader = new PageReader(iStream, - new NamespaceFilter(new PageWriter(oStream), ENABLED_NAMESPACES)); - break; + return PageReader::new; case REVISION: - xmlReader = new RevisionReader(iStream, - new NamespaceFilter(new RevisionWriter(oStream), ENABLED_NAMESPACES)); - break; + return RevisionReader::new; case TEXT: - xmlReader = new TextReader(iStream, - new NamespaceFilter(new TextWriter(oStream), ENABLED_NAMESPACES)); - break; + return TextReader::new; + default: + throw new IllegalArgumentException("Unsupported table type: " + table); } } @@ -91,7 +141,12 @@ public synchronized void run() { try { isComplete = false; - xmlReader.readDump(); + if (parts != null) { + MultiPartXmlDumpReader.readDumps(parts, multiPartWriter, readerFactory); + } + else { + xmlReader.readDump(); + } isComplete = true; } catch (IOException e) { @@ -101,12 +156,17 @@ public synchronized void run() } /** - * Abort a conversion + * Abort a conversion. + *

+ * Only supported in single-file mode. In multi-part mode the abort flag is recorded but does + * not interrupt an in-flight SAX parse — callers must let the current part finish. */ public synchronized void abort() { if (!isComplete) { - xmlReader.abort(); + if (xmlReader != null) { + xmlReader.abort(); + } isComplete = true; } } diff --git a/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamMultiPartTest.java b/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamMultiPartTest.java new file mode 100644 index 00000000..f917ab70 --- /dev/null +++ b/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamMultiPartTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.timemachine.dump.xml; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.dkpro.jwpl.wikimachine.dump.xml.DumpTableEnum; +import org.junit.jupiter.api.Test; + +/** + * Feeds two self-contained XML parts through the multi-part overload of + * {@link XMLDumpTableInputStream#initialize(List, DumpTableEnum)} and asserts the piped SQL + * output bytes equal the output of a single-document dump with the same pages. + */ +class XMLDumpTableInputStreamMultiPartTest +{ + + private static final String PART_HEADER = + "\n" + + "\n" + + " \n" + + " Test Wiki\n" + + " http://test.example/\n" + + " MediaWiki-test\n" + + " first-letter\n" + + " \n" + + " \n" + + " Talk\n" + + " \n" + + " \n"; + private static final String PART_FOOTER = "\n"; + + private static String pageBlock(int pageId, int revisionId, String title) + { + return " \n" + + " " + title + "\n" + + " " + pageId + "\n" + + " \n" + + " " + revisionId + "\n" + + " 2020-01-01T00:00:00Z\n" + + " \n" + + " Alice\n" + + " 100\n" + + " \n" + + " Body of " + title + "\n" + + " \n" + + " \n"; + } + + private static InputStream stream(String xml) + { + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static byte[] runSingle(String xml, DumpTableEnum table) throws IOException + { + XMLDumpTableInputStream sut = new XMLDumpTableInputStream(); + sut.initialize(stream(xml), table); + return drain(sut); + } + + private static byte[] runMulti(List parts, DumpTableEnum table) throws IOException + { + XMLDumpTableInputStream sut = new XMLDumpTableInputStream(); + sut.initialize(parts, table); + return drain(sut); + } + + private static byte[] drain(XMLDumpTableInputStream sut) throws IOException + { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final byte[] buf = new byte[4096]; + int n; + while ((n = sut.read(buf, 0, buf.length)) != -1) { + out.write(buf, 0, n); + } + sut.close(); + return out.toByteArray(); + } + + private static void assertMultiEqualsSingle(DumpTableEnum table) throws IOException + { + final String pageOne = pageBlock(1, 10, "Page One"); + final String pageTwo = pageBlock(2, 20, "Page Two"); + + final byte[] single = runSingle( + PART_HEADER + pageOne + pageTwo + PART_FOOTER, table); + final byte[] multi = runMulti( + List.of(stream(PART_HEADER + pageOne + PART_FOOTER), + stream(PART_HEADER + pageTwo + PART_FOOTER)), + table); + + assertTrue(single.length > 0, "empty single-doc output for " + table); + assertArrayEquals(single, multi, + "multi-part output diverges from single-document output for " + table); + } + + @Test + void multiPartPageTableMatchesSingleDocument() throws IOException + { + assertMultiEqualsSingle(DumpTableEnum.PAGE); + } + + @Test + void multiPartRevisionTableMatchesSingleDocumentOnIdFields() throws IOException + { + // RevisionWriter also persists the revision timestamp's millisecond component, which + // AbstractXmlDumpReader derives from a GregorianCalendar whose millis field is seeded + // with System.currentTimeMillis() at construction time — a pre-existing non-determinism + // that is orthogonal to multi-part handling. Compare the deterministic (pageId, revId) + // part of each 16-byte record instead. + final String pageOne = pageBlock(1, 10, "Page One"); + final String pageTwo = pageBlock(2, 20, "Page Two"); + + final byte[] single = runSingle( + PART_HEADER + pageOne + pageTwo + PART_FOOTER, DumpTableEnum.REVISION); + final byte[] multi = runMulti( + List.of(stream(PART_HEADER + pageOne + PART_FOOTER), + stream(PART_HEADER + pageTwo + PART_FOOTER)), + DumpTableEnum.REVISION); + + assertTrue(single.length > 0 && single.length == multi.length, + "revision binaries differ in length"); + // Each record: int pageId (4) + int revId (4) + long millis (8) = 16 bytes. + for (int i = 0; i < single.length; i += 16) { + assertArrayEquals( + java.util.Arrays.copyOfRange(single, i, i + 8), + java.util.Arrays.copyOfRange(multi, i, i + 8), + "pageId/revId diverge at record offset " + i); + } + } + + @Test + void multiPartTextTableMatchesSingleDocument() throws IOException + { + assertMultiEqualsSingle(DumpTableEnum.TEXT); + } + + @Test + void singleElementListMatchesSingleStream() throws IOException + { + final String doc = PART_HEADER + pageBlock(7, 77, "Solo") + PART_FOOTER; + + final byte[] fromSingle = runSingle(doc, DumpTableEnum.PAGE); + final byte[] fromList = runMulti(List.of(stream(doc)), DumpTableEnum.PAGE); + + assertArrayEquals(fromSingle, fromList); + } +} diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/DumpTableInputStream.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/DumpTableInputStream.java index 3b359e38..b29b3a79 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/DumpTableInputStream.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/DumpTableInputStream.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; /** * An abstraction of an {@link InputStream} for Wikipedia table dumps of three {@link DumpTableEnum types}. @@ -37,4 +38,28 @@ public abstract class DumpTableInputStream */ public abstract void initialize(InputStream inputStream, DumpTableEnum table) throws IOException; + + /** + * Multi-part counterpart of {@link #initialize(InputStream, DumpTableEnum)}. The default + * implementation transparently forwards a single-element list to the single-stream + * initializer and rejects larger lists with {@link UnsupportedOperationException}; + * subclasses that can read across a sequence of self-contained XML documents should + * override this method. + * + * @param inputStreams Ordered list of input streams. Must not be {@code null} or empty. + * @param table The {@link DumpTableEnum table type}. + * @throws IOException Thrown if IO errors occurred. + */ + public void initialize(List inputStreams, DumpTableEnum table) throws IOException + { + if (inputStreams == null || inputStreams.isEmpty()) { + throw new IllegalArgumentException("'inputStreams' must not be null or empty."); + } + if (inputStreams.size() == 1) { + initialize(inputStreams.get(0), table); + return; + } + throw new UnsupportedOperationException( + "Multi-part initialisation is not supported by " + getClass().getSimpleName()); + } } From 22912a22685071b03f2d632206006f3d2b097223 Mon Sep 17 00:00:00 2001 From: Richard Zowalla Date: Fri, 24 Apr 2026 14:30:12 +0200 Subject: [PATCH 07/10] #420: MultiPartXmlDumpReader takes ownership of part streams readDumps used to leave its InputStream arguments open; callers (DataMachineGenerator, TimeMachineGenerator) also never closed the streams they opened via the decompressor, multiplying the leak by the number of parts in a multi-file dump. - readDumps now closes every stream in the parts list before returning, on both the success and failure paths. Exceptions raised during close or during the wrapper's final flush are attached as suppressed to any primary error from the parse. - The error path is simplified: a single primary-exception slot is threaded through parse, per-part close, and wrapper.finish(), then rethrown at the end. Replaces the earlier dual-finish call that relied on finish()'s idempotency for correctness. - Javadoc documents the ownership contract so callers no longer need to close the streams themselves. --- .../dump/xml/MultiPartXmlDumpReader.java | 62 +++++++++++++++---- .../dump/xml/MultiPartXmlDumpReaderTest.java | 52 ++++++++++++++++ 2 files changed, 102 insertions(+), 12 deletions(-) diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java index d365b367..1852390d 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java @@ -61,7 +61,13 @@ private MultiPartXmlDumpReader() * Parses every part in {@code parts} against the same {@code writer}. Events from * the individual parts are funnelled through a {@link MultiPartDumpWriter} so the * delegate observes the combined stream as a single {@code } document. - * The delegate writer is closed by this method after the last part has been consumed. + *

+ * This method takes ownership of the supplied streams: every element of {@code parts} + * is {@link InputStream#close() closed} before {@code readDumps} returns (on both the + * success and failure paths), and the delegate writer is closed via the wrapper's + * {@code finish()} exactly once at the end. Exceptions raised by stream close or by + * the wrapper's final flush are attached as suppressed to any primary error from the + * parse. * * @param parts Ordered list of decompressed XML {@link InputStream streams}. Must * not be {@code null}, must not be empty, and must not contain @@ -70,7 +76,7 @@ private MultiPartXmlDumpReader() * @param factory Instantiates a fresh reader per part — typically a method reference * such as {@code SimpleXmlDumpReader::new}. * @throws IOException Thrown on I/O or SAX errors encountered while parsing - * any part. + * any part, or while closing the supplied streams. * @throws IllegalArgumentException If {@code parts} is null, empty, or contains a null * element. */ @@ -89,21 +95,53 @@ public static void readDumps(List parts, DumpWriter writer, ReaderF } final MultiPartDumpWriter wrapper = new MultiPartDumpWriter(writer); + Throwable primary = null; try { for (InputStream part : parts) { - final AbstractXmlDumpReader reader = factory.apply(part, wrapper); - reader.doParse(); + factory.apply(part, wrapper).doParse(); } - wrapper.finish(); } catch (IOException | RuntimeException e) { - try { - wrapper.finish(); - } - catch (IOException suppressed) { - e.addSuppressed(suppressed); - } - throw e; + primary = e; + } + // We took ownership of every stream — close them all, regardless of outcome. + for (InputStream part : parts) { + primary = closeAndChain(part, primary); + } + // finish() is idempotent; emits a single writeEndWiki (if any startWiki was seen) + // and closes the delegate writer. + try { + wrapper.finish(); + } + catch (IOException e) { + primary = chain(primary, e); + } + + if (primary instanceof IOException) { + throw (IOException) primary; + } + if (primary instanceof RuntimeException) { + throw (RuntimeException) primary; + } + } + + private static Throwable closeAndChain(InputStream stream, Throwable primary) + { + try { + stream.close(); + return primary; + } + catch (IOException e) { + return chain(primary, e); + } + } + + private static Throwable chain(Throwable primary, Throwable next) + { + if (primary == null) { + return next; } + primary.addSuppressed(next); + return primary; } } diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java index c39538bd..52f85f82 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java @@ -19,14 +19,17 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.dkpro.jwpl.mwdumper.importer.DumpWriter; import org.dkpro.jwpl.mwdumper.importer.Page; @@ -161,6 +164,55 @@ void closesDelegateEvenIfParsingFails() assertEquals(List.of("close"), delegate.events); } + @Test + void closesEveryPartStreamOnSuccess() throws IOException + { + final CountingInputStream a = new CountingInputStream(stream(part(1, 10, "A"))); + final CountingInputStream b = new CountingInputStream(stream(part(2, 20, "B"))); + + MultiPartXmlDumpReader.readDumps(List.of(a, b), + new RecordingDumpWriter(), WikiXMLDumpReader::new); + + // SAXParser.parse also closes the stream internally in some JDKs; close() is + // idempotent on InputStream so we only require that each part was closed at least + // once (ownership transferred, no leak). + assertTrue(a.closed.get() >= 1, "first part stream not closed"); + assertTrue(b.closed.get() >= 1, "second part stream not closed"); + } + + @Test + void closesEveryPartStreamOnFailure() + { + // Second part is malformed so parsing fails mid-list; both parts must still be closed. + final CountingInputStream good = new CountingInputStream(stream(part(1, 10, "A"))); + final CountingInputStream bad = new CountingInputStream( + stream("oops")); + + assertThrows(IOException.class, () -> + MultiPartXmlDumpReader.readDumps(List.of(good, bad), + new RecordingDumpWriter(), WikiXMLDumpReader::new)); + + assertTrue(good.closed.get() >= 1, "good part stream not closed"); + assertTrue(bad.closed.get() >= 1, "bad part stream not closed"); + } + + private static final class CountingInputStream extends FilterInputStream + { + final AtomicInteger closed = new AtomicInteger(); + + CountingInputStream(InputStream delegate) + { + super(delegate); + } + + @Override + public void close() throws IOException + { + closed.incrementAndGet(); + super.close(); + } + } + private static final class RecordingDumpWriter implements DumpWriter { From 5561aa163b72a42dd6969c8e9890dcfd0567223c Mon Sep 17 00:00:00 2001 From: Richard Zowalla Date: Fri, 24 Apr 2026 14:32:21 +0200 Subject: [PATCH 08/10] #420: Hoist closeQuietly into AbstractDecompressor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BZip2Decompressor and GZipDecompressor carried byte-identical closeQuietly helpers — the partial-open cleanup routine that closes every stream collected so far and attaches IOExceptions from close as suppressed on the primary error. Consolidates to a single protected static method on AbstractDecompressor so any future decompressor that wires up a multi-part path picks it up for free. --- .../decompression/AbstractDecompressor.java | 25 +++++++++++++++++++ .../decompression/BZip2Decompressor.java | 13 ---------- .../decompression/GZipDecompressor.java | 13 ---------- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressor.java index 04da9507..e4f00d01 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressor.java @@ -27,6 +27,7 @@ import java.nio.file.InvalidPathException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.List; /** * A common base {@link IDecompressor} implementation that provides methods to open @@ -128,4 +129,28 @@ private ClassLoader getContextClassLoader() { return Thread.currentThread().getContextClassLoader(); } + + /** + * Closes every element of {@code streams}, attaching any thrown {@link IOException} as + * suppressed to {@code primary}. Intended for cleanup after a partial open loop where + * several streams have already been created but the call must now unwind. + * + * @param streams Streams to close. {@code null} elements are tolerated. + * @param primary The in-flight exception that triggered cleanup; close failures are + * recorded as suppressed on it. + */ + protected static void closeQuietly(List streams, Throwable primary) + { + for (InputStream s : streams) { + if (s == null) { + continue; + } + try { + s.close(); + } + catch (IOException suppressed) { + primary.addSuppressed(suppressed); + } + } + } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java index ed479fff..93618de2 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java @@ -84,17 +84,4 @@ public InputStream getInputStreamSequence(List resources) throws IOExcepti throw e; } } - - private static void closeQuietly(List streams, Throwable primary) { - for (InputStream s : streams) { - if (s == null) { - continue; - } - try { - s.close(); - } catch (IOException suppressed) { - primary.addSuppressed(suppressed); - } - } - } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java index 4972a183..434a8b24 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java @@ -83,17 +83,4 @@ public InputStream getInputStreamSequence(List resources) throws IOExcepti throw e; } } - - private static void closeQuietly(List streams, Throwable primary) { - for (InputStream s : streams) { - if (s == null) { - continue; - } - try { - s.close(); - } catch (IOException suppressed) { - primary.addSuppressed(suppressed); - } - } - } } From b559cfc577485eed8ed6821aa120b82e1424aafe Mon Sep 17 00:00:00 2001 From: Richard Zowalla Date: Fri, 24 Apr 2026 14:36:43 +0200 Subject: [PATCH 09/10] #420: Tighten multi-part API surface Three small encapsulation / API hygiene touch-ups surfaced during review: - DumpFileDiscovery.pageRangeStart is demoted from package-private to private. It is only referenced internally via the DumpFileDiscovery::pageRangeStart method reference inside orderByPageRange; method references to private static members resolve fine from the same class. No reason to leak the compare key into the package. - MultiPartXmlDumpReader.ReaderFactory no longer extends BiFunction. Extending the JDK SAM pulled BiFunction.andThen into the public surface, which has no meaningful semantics here. It is now a standalone @FunctionalInterface with a single create(InputStream, DumpWriter) method; method references such as SimpleXmlDumpReader::new continue to match the SAM unchanged. - MultiPartDumpWriter gains an explicit "not thread-safe" javadoc note. Matches the DumpWriter contract and documents the single-threaded expectation the multi-part pipeline relies on. --- .../mwdumper/importer/MultiPartDumpWriter.java | 4 ++++ .../dump/xml/XMLDumpTableInputStreamThread.java | 2 +- .../dump/xml/MultiPartXmlDumpReader.java | 14 ++++++++++---- .../jwpl/wikimachine/util/DumpFileDiscovery.java | 2 +- 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java b/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java index 4247c405..5d1dc05e 100644 --- a/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java +++ b/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java @@ -40,6 +40,10 @@ * * Call {@link #finish()} exactly once after all parts have been parsed to emit the single * {@code writeEndWiki()} and close the delegate. + *

+ * Thread-safety: not thread-safe. Mirroring the {@link DumpWriter} contract, instances + * are intended for single-threaded use — events from one parser at a time. In the multi-part + * pipeline that means all parts are parsed sequentially against the same wrapper. */ public final class MultiPartDumpWriter implements DumpWriter diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java index b8fc58f6..9fa1e051 100755 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java @@ -119,7 +119,7 @@ private static DumpWriter createWriter(OutputStream oStream, DumpTableEnum table private static AbstractXmlDumpReader createReader(InputStream in, DumpWriter writer, DumpTableEnum table) { - return readerFactoryFor(table).apply(in, writer); + return readerFactoryFor(table).create(in, writer); } private static MultiPartXmlDumpReader.ReaderFactory readerFactoryFor(DumpTableEnum table) diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java index 1852390d..3d7a7d94 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java @@ -21,7 +21,6 @@ import java.io.InputStream; import java.util.List; import java.util.Objects; -import java.util.function.BiFunction; import org.dkpro.jwpl.mwdumper.importer.DumpWriter; import org.dkpro.jwpl.mwdumper.importer.MultiPartDumpWriter; @@ -44,12 +43,19 @@ public final class MultiPartXmlDumpReader /** * Factory that produces a fresh {@link AbstractXmlDumpReader} (typically a concrete * subclass such as {@code SimpleXmlDumpReader} or a {@code timemachine} reader) for - * a given part's {@link InputStream} and the shared {@link DumpWriter}. + * a given part's {@link InputStream} and the shared {@link DumpWriter}. Callers + * normally pass a constructor reference such as {@code SimpleXmlDumpReader::new}. */ @FunctionalInterface public interface ReaderFactory - extends BiFunction { + /** + * @param in The part's input stream. + * @param writer The shared dump writer (already wrapped in a + * {@link MultiPartDumpWriter} by {@link #readDumps}). + * @return A fresh reader bound to {@code in} and {@code writer}. + */ + AbstractXmlDumpReader create(InputStream in, DumpWriter writer); } private MultiPartXmlDumpReader() @@ -98,7 +104,7 @@ public static void readDumps(List parts, DumpWriter writer, ReaderF Throwable primary = null; try { for (InputStream part : parts) { - factory.apply(part, wrapper).doParse(); + factory.create(part, wrapper).doParse(); } } catch (IOException | RuntimeException e) { diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java index 1dc7b7c7..43e49891 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java @@ -138,7 +138,7 @@ public static List orderByPageRange(Collection files) * {@link Long#MIN_VALUE} if absent. Files without a range therefore sort before * ranged parts in {@link #orderByPageRange}. */ - static long pageRangeStart(File file) + private static long pageRangeStart(File file) { if (file == null) { return Long.MIN_VALUE; From 308766360a1c2e0464a80d9922ea2eb90a8e7f4a Mon Sep 17 00:00:00 2001 From: Richard Zowalla Date: Fri, 24 Apr 2026 14:40:44 +0200 Subject: [PATCH 10/10] #420: Replace mode flags in XMLDumpTableInputStreamThread with strategies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The thread class had two operating modes crammed into one body: the single-file path set xmlReader and left the multi-part fields null; the multi-part path did the opposite. run() and abort() switched on (parts != null). Classic "state machine in nullable fields". Replaces the mode flags with two final strategies assigned once in each constructor: - ParseTask parseTask — what run() invokes (reader.readDump() for the single-file path, MultiPartXmlDumpReader.readDumps(...) for the multi-part path). - Runnable abortAction — what abort() invokes (reader.abort() for the single-file path, a no-op for multi-part). No null fields, no runtime mode checks, and the multi-part path no longer needs to keep a separate DumpWriter + ReaderFactory as state purely to reconstruct the action at run() time. --- .../xml/XMLDumpTableInputStreamThread.java | 76 +++++++------------ 1 file changed, 29 insertions(+), 47 deletions(-) diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java index 9fa1e051..799ad2f5 100755 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java @@ -42,51 +42,43 @@ class XMLDumpTableInputStreamThread .getLogger(MethodHandles.lookup().lookupClass()); /** - * Enable the main and category pages as well as discussions + * Enable the main and category pages as well as discussions. */ private static final String ENABLED_NAMESPACES = "NS_MAIN,NS_TALK,NS_CATEGORY"; - /** - * Generalization {@link org.dkpro.jwpl.mwdumper.importer.XmlDumpReader} that parses the XML - * dump - */ - private AbstractXmlDumpReader xmlReader; + /** Parses the bound input into SQL. May throw {@link IOException}. */ + @FunctionalInterface + private interface ParseTask + { + void parse() throws IOException; + } - /** - * Populated in multi-part mode. When non-null, {@link #run()} drives parsing through - * {@link MultiPartXmlDumpReader} instead of a single {@link AbstractXmlDumpReader#readDump()}. - */ - private final List parts; - private final DumpWriter multiPartWriter; - private final MultiPartXmlDumpReader.ReaderFactory readerFactory; + private final ParseTask parseTask; + private final Runnable abortAction; - /** - * completion flag for a conversion process - */ + /** completion flag for the conversion process */ private boolean isComplete; /** - * Initiate input and output streams for a single-file dump. + * Drive the conversion of a single-file dump. * - * @param iStream XML input stream - * @param oStream SQL output stream + * @param iStream XML input stream. + * @param oStream SQL output stream. * @param table Kind of table output expected. - * @throws IOException Thrown in case errors occurred. */ public XMLDumpTableInputStreamThread(InputStream iStream, OutputStream oStream, DumpTableEnum table) - throws IOException { super("xml2sql"); - this.parts = null; - this.multiPartWriter = null; - this.readerFactory = null; - this.xmlReader = createReader(iStream, createWriter(oStream, table), table); + final AbstractXmlDumpReader reader = readerFactoryFor(table) + .create(iStream, createWriter(oStream, table)); + this.parseTask = reader::readDump; + this.abortAction = reader::abort; } /** - * Initiate input and output streams for a multi-part dump. Every element of {@code iStreams} - * is a self-contained XML document; SAX events across parts are collapsed into a single + * Drive the conversion of a multi-part dump. Each element of {@code iStreams} is a + * self-contained XML document; SAX events across parts are collapsed into a single * logical document by {@link MultiPartXmlDumpReader}. * * @param iStreams Ordered list of XML part input streams (ascending page-range). @@ -97,9 +89,12 @@ public XMLDumpTableInputStreamThread(List iStreams, OutputStream oS DumpTableEnum table) { super("xml2sql"); - this.parts = iStreams; - this.multiPartWriter = createWriter(oStream, table); - this.readerFactory = readerFactoryFor(table); + final DumpWriter writer = createWriter(oStream, table); + final MultiPartXmlDumpReader.ReaderFactory factory = readerFactoryFor(table); + this.parseTask = () -> MultiPartXmlDumpReader.readDumps(iStreams, writer, factory); + // Abort is a best-effort signal to the single-file reader; the multi-part pipeline + // has no equivalent per-part hook, so it is a no-op here. + this.abortAction = () -> { /* no-op */ }; } private static DumpWriter createWriter(OutputStream oStream, DumpTableEnum table) @@ -116,12 +111,6 @@ private static DumpWriter createWriter(OutputStream oStream, DumpTableEnum table } } - private static AbstractXmlDumpReader createReader(InputStream in, DumpWriter writer, - DumpTableEnum table) - { - return readerFactoryFor(table).create(in, writer); - } - private static MultiPartXmlDumpReader.ReaderFactory readerFactoryFor(DumpTableEnum table) { switch (table) { @@ -141,12 +130,7 @@ public synchronized void run() { try { isComplete = false; - if (parts != null) { - MultiPartXmlDumpReader.readDumps(parts, multiPartWriter, readerFactory); - } - else { - xmlReader.readDump(); - } + parseTask.parse(); isComplete = true; } catch (IOException e) { @@ -158,15 +142,13 @@ public synchronized void run() /** * Abort a conversion. *

- * Only supported in single-file mode. In multi-part mode the abort flag is recorded but does - * not interrupt an in-flight SAX parse — callers must let the current part finish. + * Only supported in single-file mode. In multi-part mode the abort flag is recorded but + * does not interrupt an in-flight SAX parse — callers must let the current part finish. */ public synchronized void abort() { if (!isComplete) { - if (xmlReader != null) { - xmlReader.abort(); - } + abortAction.run(); isComplete = true; } }