Skip to content

Commit 47a7582

Browse files
committed
test: Readd streaming test
This work was derived from the existing generated tests using Gemini 3.1 Pro
1 parent 47785ab commit 47a7582

File tree

1 file changed

+239
-0
lines changed

1 file changed

+239
-0
lines changed
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package dev.hallock.zstd.test;
2+
3+
import dev.hallock.zstd.*;
4+
import org.junit.jupiter.api.Assertions;
5+
import org.junit.jupiter.params.ParameterizedTest;
6+
import org.junit.jupiter.params.provider.Arguments;
7+
import org.junit.jupiter.params.provider.MethodSource;
8+
9+
import java.lang.foreign.Arena;
10+
import java.lang.foreign.MemorySegment;
11+
import java.lang.foreign.ValueLayout;
12+
import java.nio.charset.StandardCharsets;
13+
import java.util.Arrays;
14+
import java.util.List;
15+
import java.util.Random;
16+
import java.util.stream.Stream;
17+
18+
public class ZstdStreamingTest {
19+
20+
public static List<byte[]> provideDataSets() {
21+
Random random = new Random(67);
22+
23+
byte[] empty = new byte[0];
24+
25+
byte[] smallRandom = new byte[50];
26+
random.nextBytes(smallRandom);
27+
28+
byte[] largeRandom = new byte[1024 * 1024]; // 1MB
29+
random.nextBytes(largeRandom);
30+
31+
byte[] largeCompressible = "This is a highly compressible string. ".repeat(10000).getBytes(StandardCharsets.UTF_8); // ~380KB
32+
33+
return Arrays.asList(empty, smallRandom, largeCompressible, largeRandom);
34+
}
35+
36+
public static List<Integer> provideChunkSizes() {
37+
return Arrays.asList(1, 10, 1024, 4096);
38+
}
39+
40+
public static Stream<Arguments> provideStreamingData() {
41+
return provideDataSets().stream()
42+
.flatMap(data -> provideChunkSizes().stream()
43+
.map(chunkSize -> Arguments.of(data, chunkSize)));
44+
}
45+
46+
@ParameterizedTest
47+
@MethodSource("provideStreamingData")
48+
public void testStreamingWithSmallOutputBuffer(byte[] testData, int chunkSize) throws ZstdException {
49+
try (Arena arena = Arena.ofConfined();
50+
ZstdCompressionContext cctx = new ZstdCompressionContext();
51+
ZstdDecompressionContext dctx = new ZstdDecompressionContext()) {
52+
53+
MemorySegment src = arena.allocateFrom(ValueLayout.JAVA_BYTE, testData);
54+
MemorySegment compressedDst = arena.allocate(Zstd.compressBound(testData.length) + 1024);
55+
MemorySegment decompressedDst = arena.allocate(testData.length);
56+
57+
ZstdInputBuffer inBuffer = new ZstdInputBuffer(arena, src);
58+
ZstdOutputBuffer outBuffer = new ZstdOutputBuffer(arena, compressedDst);
59+
outBuffer.size(chunkSize); // Sub-buffer output buffer
60+
61+
while (inBuffer.position() < inBuffer.size()) {
62+
ZstdEndDirective directive = ZstdEndDirective.CONTINUE;
63+
cctx.compressStream(outBuffer, inBuffer, directive).orElseThrow();
64+
65+
if (outBuffer.position() == outBuffer.size()) {
66+
// Output buffer is full, expand it
67+
outBuffer.size(outBuffer.position() + chunkSize);
68+
}
69+
}
70+
71+
while (true) {
72+
long remaining = cctx.compressStream(outBuffer, inBuffer, ZstdEndDirective.END).orElseThrow().result();
73+
if (remaining == 0) break;
74+
75+
if (outBuffer.position() == outBuffer.size()) {
76+
outBuffer.size(outBuffer.position() + chunkSize);
77+
}
78+
}
79+
80+
long compressedSize = outBuffer.position();
81+
MemorySegment actualCompressed = compressedDst.asSlice(0, compressedSize);
82+
83+
ZstdInputBuffer dInBuf = new ZstdInputBuffer(arena, actualCompressed);
84+
ZstdOutputBuffer dOutBuf = new ZstdOutputBuffer(arena, decompressedDst);
85+
dOutBuf.size(chunkSize); // Sub-buffer output buffer for decompression
86+
87+
while (dInBuf.position() < dInBuf.size()) {
88+
dctx.decompressStream(dOutBuf, dInBuf).orElseThrow();
89+
90+
if (dOutBuf.position() == dOutBuf.size()) {
91+
// Output buffer is full, expand it
92+
dOutBuf.size(dOutBuf.position() + chunkSize);
93+
}
94+
}
95+
96+
byte[] actualDecompressed = decompressedDst.asSlice(0, dOutBuf.position()).toArray(ValueLayout.JAVA_BYTE);
97+
Assertions.assertArrayEquals(testData, actualDecompressed);
98+
}
99+
}
100+
101+
@ParameterizedTest
102+
@MethodSource("provideStreamingData")
103+
public void testStreamingWithFlush(byte[] testData, int chunkSize) throws ZstdException {
104+
try (Arena arena = Arena.ofConfined();
105+
ZstdCompressionContext cctx = new ZstdCompressionContext();
106+
ZstdDecompressionContext dctx = new ZstdDecompressionContext()) {
107+
108+
MemorySegment src = arena.allocateFrom(ValueLayout.JAVA_BYTE, testData);
109+
MemorySegment compressedDst = arena.allocate(Zstd.compressBound(testData.length) + 1024);
110+
MemorySegment decompressedDst = arena.allocate(testData.length);
111+
112+
ZstdInputBuffer inBuffer = new ZstdInputBuffer(arena, src);
113+
ZstdOutputBuffer outBuffer = new ZstdOutputBuffer(arena, compressedDst);
114+
outBuffer.size(chunkSize);
115+
116+
ZstdInputBuffer dInBuf = new ZstdInputBuffer(arena, compressedDst);
117+
ZstdOutputBuffer dOutBuf = new ZstdOutputBuffer(arena, decompressedDst);
118+
dOutBuf.size(chunkSize);
119+
120+
// Compress half
121+
long half = src.byteSize() / 2;
122+
inBuffer.size(half);
123+
124+
while (inBuffer.position() < inBuffer.size()) {
125+
cctx.compressStream(outBuffer, inBuffer, ZstdEndDirective.CONTINUE).orElseThrow();
126+
if (outBuffer.position() == outBuffer.size()) {
127+
outBuffer.size(outBuffer.position() + chunkSize);
128+
}
129+
}
130+
131+
cctx.compressStream(outBuffer, inBuffer, ZstdEndDirective.FLUSH).orElseThrow();
132+
133+
// Should be able to decompress this part immediately
134+
long partialCompressedSize = outBuffer.position();
135+
if (partialCompressedSize > 0) {
136+
dInBuf.size(partialCompressedSize);
137+
while (dInBuf.position() < dInBuf.size()) {
138+
dctx.decompressStream(dOutBuf, dInBuf).orElseThrow();
139+
if (dOutBuf.position() == dOutBuf.size()) {
140+
dOutBuf.size(dOutBuf.position() + chunkSize);
141+
}
142+
}
143+
}
144+
145+
// Compress the rest
146+
inBuffer.size(src.byteSize());
147+
while (inBuffer.position() < inBuffer.size()) {
148+
cctx.compressStream(outBuffer, inBuffer, ZstdEndDirective.CONTINUE).orElseThrow();
149+
if (outBuffer.position() == outBuffer.size()) {
150+
outBuffer.size(outBuffer.position() + chunkSize);
151+
}
152+
}
153+
154+
while (true) {
155+
long remaining = cctx.compressStream(outBuffer, inBuffer, ZstdEndDirective.END).orElseThrow().result();
156+
if (remaining == 0) break;
157+
if (outBuffer.position() == outBuffer.size()) {
158+
outBuffer.size(outBuffer.position() + chunkSize);
159+
}
160+
}
161+
162+
// Decompress the rest
163+
long totalCompressedSize = outBuffer.position();
164+
if (totalCompressedSize > 0) {
165+
dInBuf.size(totalCompressedSize);
166+
167+
while (dInBuf.position() < dInBuf.size()) {
168+
dctx.decompressStream(dOutBuf, dInBuf).orElseThrow();
169+
if (dOutBuf.position() == dOutBuf.size()) {
170+
dOutBuf.size(dOutBuf.position() + chunkSize);
171+
}
172+
}
173+
174+
byte[] actualDecompressed = decompressedDst.asSlice(0, dOutBuf.position()).toArray(ValueLayout.JAVA_BYTE);
175+
Assertions.assertArrayEquals(testData, actualDecompressed);
176+
} else {
177+
Assertions.assertArrayEquals(new byte[0], testData);
178+
}
179+
}
180+
}
181+
182+
@ParameterizedTest
183+
@MethodSource("provideStreamingData")
184+
public void testStreamingMultipleChunks(byte[] testData, int chunkSize) throws ZstdException {
185+
try (Arena arena = Arena.ofConfined(); ZstdCompressionContext cctx = new ZstdCompressionContext();
186+
ZstdDecompressionContext dctx = new ZstdDecompressionContext()) {
187+
188+
MemorySegment src = arena.allocateFrom(ValueLayout.JAVA_BYTE, testData);
189+
MemorySegment compressedDst = arena.allocate(Zstd.compressBound(testData.length) + 1024);
190+
MemorySegment decompressedDst = arena.allocate(testData.length);
191+
192+
ZstdInputBuffer inBuffer = new ZstdInputBuffer(arena, src);
193+
ZstdOutputBuffer outBuffer = new ZstdOutputBuffer(arena, compressedDst);
194+
195+
// Compress in chunks
196+
while (inBuffer.position() < inBuffer.size()) {
197+
long remaining = inBuffer.size() - inBuffer.position();
198+
long currentChunk = Math.min(chunkSize, remaining);
199+
200+
// Simulate smaller input size for this chunk
201+
long originalPosition = inBuffer.position();
202+
inBuffer.size(originalPosition + currentChunk);
203+
204+
ZstdEndDirective directive = (inBuffer.size() == src.byteSize()) ? ZstdEndDirective.END : ZstdEndDirective.CONTINUE;
205+
cctx.compressStream(outBuffer, inBuffer, directive).orElseThrow();
206+
207+
// Restore actual size for loop condition
208+
inBuffer.size(src.byteSize());
209+
}
210+
211+
// Finish compression if we haven't already with END
212+
// Ensure ending works properly
213+
long remaining = cctx.compressStream(outBuffer, inBuffer, ZstdEndDirective.END).orElseThrow().result();
214+
Assertions.assertEquals(0, remaining, "Stream not fully flushed");
215+
216+
long compressedSize = outBuffer.position();
217+
MemorySegment actualCompressed = compressedDst.asSlice(0, compressedSize);
218+
219+
// Decompress in chunks
220+
ZstdInputBuffer dInBuf = new ZstdInputBuffer(arena, actualCompressed);
221+
ZstdOutputBuffer dOutBuf = new ZstdOutputBuffer(arena, decompressedDst);
222+
223+
while (dInBuf.position() < dInBuf.size()) {
224+
long remainingIn = dInBuf.size() - dInBuf.position();
225+
long currentChunk = Math.min(chunkSize, remainingIn);
226+
227+
long originalPosition = dInBuf.position();
228+
dInBuf.size(originalPosition + currentChunk);
229+
230+
dctx.decompressStream(dOutBuf, dInBuf).orElseThrow();
231+
232+
dInBuf.size(actualCompressed.byteSize());
233+
}
234+
235+
byte[] actualDecompressed = decompressedDst.asSlice(0, dOutBuf.position()).toArray(ValueLayout.JAVA_BYTE);
236+
Assertions.assertArrayEquals(testData, actualDecompressed);
237+
}
238+
}
239+
}

0 commit comments

Comments
 (0)