From c33eb699c558fa8ea523dbb41b07eb26afa51ad6 Mon Sep 17 00:00:00 2001 From: f1v3-dev Date: Thu, 2 Apr 2026 18:24:26 +0900 Subject: [PATCH] FEATURE: Add pluggable compression codec support with Snappy algorithm --- pom.xml | 9 ++ .../BaseSerializingTranscoder.java | 18 ++- .../transcoders/CompressionUtils.java | 1 + .../GenericJsonSerializingTranscoder.java | 35 +++--- .../JsonSerializingTranscoder.java | 26 ++-- .../transcoders/SerializingTranscoder.java | 27 +++- .../compression/CompressionCodecIF.java | 12 ++ .../compression/GZIPCompressionCodec.java | 69 ++++++++++ .../compression/SnappyCompressionCodec.java | 55 ++++++++ .../compression/CompressionCodecTest.java | 118 ++++++++++++++++++ 10 files changed, 334 insertions(+), 36 deletions(-) create mode 100644 src/main/java/net/spy/memcached/transcoders/compression/CompressionCodecIF.java create mode 100644 src/main/java/net/spy/memcached/transcoders/compression/GZIPCompressionCodec.java create mode 100644 src/main/java/net/spy/memcached/transcoders/compression/SnappyCompressionCodec.java create mode 100644 src/test/java/net/spy/memcached/transcoders/compression/CompressionCodecTest.java diff --git a/pom.xml b/pom.xml index 6c5b611b9..9010581db 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,7 @@ 2.0.2 2.19.0 3.0.2 + 1.1.10.8 @@ -159,6 +160,14 @@ ${findbugs-jsr305.version} provided + + + + org.xerial.snappy + snappy-java + ${snappy-java.version} + compile + diff --git a/src/main/java/net/spy/memcached/transcoders/BaseSerializingTranscoder.java b/src/main/java/net/spy/memcached/transcoders/BaseSerializingTranscoder.java index e24fc136f..cdf90e08d 100644 --- a/src/main/java/net/spy/memcached/transcoders/BaseSerializingTranscoder.java +++ b/src/main/java/net/spy/memcached/transcoders/BaseSerializingTranscoder.java @@ -26,6 +26,8 @@ import java.lang.reflect.Proxy; import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.transcoders.compression.CompressionCodecIF; +import net.spy.memcached.transcoders.compression.GZIPCompressionCodec; /** * Base class for any transcoders that may want to work with serialized or @@ -42,7 +44,7 @@ public abstract class BaseSerializingTranscoder extends SpyObject { */ private final ClassLoader classLoader; - private final CompressionUtils cu = new CompressionUtils(); + private final CompressionCodecIF compressionCodec; protected final TranscoderUtils tu; /** @@ -57,9 +59,15 @@ public BaseSerializingTranscoder(int max, ClassLoader cl) { } public BaseSerializingTranscoder(int max, ClassLoader cl, boolean pack) { + this(max, cl, pack, new GZIPCompressionCodec()); + } + + public BaseSerializingTranscoder(int max, ClassLoader cl, boolean pack, + CompressionCodecIF codec) { super(); this.maxSize = max; this.classLoader = cl; + this.compressionCodec = codec; this.tu = new TranscoderUtils(pack); } @@ -71,7 +79,7 @@ public BaseSerializingTranscoder(int max, ClassLoader cl, boolean pack) { * @param threshold the number of bytes */ public void setCompressionThreshold(int threshold) { - cu.setCompressionThreshold(threshold); + compressionCodec.setCompressionThreshold(threshold); } public String getCharset() { @@ -127,7 +135,7 @@ protected Object deserialize(byte[] in) { * @throws NullPointerException if the input is null */ protected byte[] compress(byte[] in) { - return cu.compress(in); + return compressionCodec.compress(in); } /** @@ -137,7 +145,7 @@ protected byte[] compress(byte[] in) { * @return the decompressed byte array, or null if input is null or decompression fails */ protected byte[] decompress(byte[] in) { - return cu.decompress(in); + return compressionCodec.decompress(in); } /** @@ -147,7 +155,7 @@ protected byte[] decompress(byte[] in) { * @return true if the data should be compressed, false otherwise */ protected boolean isCompressionCandidate(byte[] data) { - return cu.isCompressionCandidate(data); + return compressionCodec.isCompressionCandidate(data); } public int getMaxSize() { diff --git a/src/main/java/net/spy/memcached/transcoders/CompressionUtils.java b/src/main/java/net/spy/memcached/transcoders/CompressionUtils.java index 51b0eb90b..10078cf11 100644 --- a/src/main/java/net/spy/memcached/transcoders/CompressionUtils.java +++ b/src/main/java/net/spy/memcached/transcoders/CompressionUtils.java @@ -12,6 +12,7 @@ /** * Utility class for compression and decompression operations. */ +@Deprecated public class CompressionUtils extends SpyObject { public static final int DEFAULT_COMPRESSION_THRESHOLD = 16384; diff --git a/src/main/java/net/spy/memcached/transcoders/GenericJsonSerializingTranscoder.java b/src/main/java/net/spy/memcached/transcoders/GenericJsonSerializingTranscoder.java index 4b16fd25a..6ede59a80 100644 --- a/src/main/java/net/spy/memcached/transcoders/GenericJsonSerializingTranscoder.java +++ b/src/main/java/net/spy/memcached/transcoders/GenericJsonSerializingTranscoder.java @@ -10,6 +10,8 @@ import net.spy.memcached.CachedData; import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.transcoders.compression.CompressionCodecIF; +import net.spy.memcached.transcoders.compression.GZIPCompressionCodec; import static net.spy.memcached.transcoders.TranscoderUtils.COMPRESSED; import static net.spy.memcached.transcoders.TranscoderUtils.SERIALIZED; @@ -30,7 +32,7 @@ public class GenericJsonSerializingTranscoder extends SpyObject implements Trans private final ObjectMapper objectMapper; private final int maxSize; - private final CompressionUtils cu; + private final CompressionCodecIF compressionCodec; private final TranscoderUtils tu; private final boolean isCollection; private final boolean forceJsonSerializeForCollection; @@ -53,15 +55,7 @@ public GenericJsonSerializingTranscoder(ObjectMapper objectMapper, String typeHi @Deprecated public GenericJsonSerializingTranscoder(ObjectMapper objectMapper, int max) { - if (objectMapper == null) { - throw new IllegalArgumentException("ObjectMapper must not be null"); - } - this.objectMapper = objectMapper; - this.maxSize = max; - this.cu = new CompressionUtils(); - this.tu = new TranscoderUtils(true); - this.isCollection = false; - this.forceJsonSerializeForCollection = false; + this(objectMapper, max, false, false, new GZIPCompressionCodec()); } /** @@ -71,13 +65,14 @@ public GenericJsonSerializingTranscoder(ObjectMapper objectMapper, int max) { */ private GenericJsonSerializingTranscoder(ObjectMapper objectMapper, int max, boolean isCollection, - boolean forceJsonSerializeForCollection) { + boolean forceJsonSerializeForCollection, + CompressionCodecIF codec) { if (objectMapper == null) { throw new IllegalArgumentException("ObjectMapper must not be null"); } this.objectMapper = objectMapper; this.maxSize = max; - this.cu = new CompressionUtils(); + this.compressionCodec = codec; this.tu = new TranscoderUtils(true); this.isCollection = isCollection; this.forceJsonSerializeForCollection = forceJsonSerializeForCollection; @@ -131,7 +126,7 @@ public boolean isForceSerializeForCollection() { * @param threshold the number of bytes */ public void setCompressionThreshold(int threshold) { - cu.setCompressionThreshold(threshold); + compressionCodec.setCompressionThreshold(threshold); } /** @@ -153,7 +148,7 @@ public Object decode(CachedData d) { } if ((d.getFlags() & COMPRESSED) != 0) { - data = cu.decompress(data); + data = compressionCodec.decompress(data); } Object rv = null; @@ -241,8 +236,8 @@ public CachedData encode(Object o) { flags |= SERIALIZED; } assert b != null; - if (!isCollection && cu.isCompressionCandidate(b)) { - byte[] compressed = cu.compress(b); + if (!isCollection && compressionCodec.isCompressionCandidate(b)) { + byte[] compressed = compressionCodec.compress(b); if (compressed.length < b.length) { getLogger().debug("Compressed %s from %d to %d", o.getClass().getName(), b.length, compressed.length); @@ -287,6 +282,7 @@ public static final class Builder { private int max; private boolean isCollection; private boolean forceJsonSerializeForCollection; + private CompressionCodecIF compressionCodec = new GZIPCompressionCodec(); private Builder(ObjectMapper objectMapper) { this.objectMapper = objectMapper; @@ -326,6 +322,11 @@ public Builder typeHintPropertyName(String typeHintPropertyName) { return this; } + public Builder compressionCodec(CompressionCodecIF codec) { + this.compressionCodec = codec; + return this; + } + public Builder forceJsonSerializeForCollection() { if (!isCollection) { throw new IllegalStateException("forceJsonSerializationForCollection can only be " + @@ -346,7 +347,7 @@ public GenericJsonSerializingTranscoder build() { objectMapper.setDefaultTyping(typer); } return new GenericJsonSerializingTranscoder(objectMapper, max, - isCollection, forceJsonSerializeForCollection); + isCollection, forceJsonSerializeForCollection, compressionCodec); } } } diff --git a/src/main/java/net/spy/memcached/transcoders/JsonSerializingTranscoder.java b/src/main/java/net/spy/memcached/transcoders/JsonSerializingTranscoder.java index b82c68d69..87d578180 100644 --- a/src/main/java/net/spy/memcached/transcoders/JsonSerializingTranscoder.java +++ b/src/main/java/net/spy/memcached/transcoders/JsonSerializingTranscoder.java @@ -26,6 +26,8 @@ import net.spy.memcached.CachedData; import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.transcoders.compression.CompressionCodecIF; +import net.spy.memcached.transcoders.compression.GZIPCompressionCodec; import static net.spy.memcached.transcoders.TranscoderUtils.COMPRESSED; import static net.spy.memcached.transcoders.TranscoderUtils.SERIALIZED; @@ -48,7 +50,7 @@ public class JsonSerializingTranscoder extends SpyObject implements Transcode private final ObjectMapper objectMapper; private final JavaType javaType; private final int maxSize; - private final CompressionUtils cu; + private final CompressionCodecIF compressionCodec; private final TranscoderUtils tu; public JsonSerializingTranscoder(Class clazz) { @@ -60,18 +62,26 @@ public JsonSerializingTranscoder(JavaType javaType) { } public JsonSerializingTranscoder(int max, Class clazz) { + this(max, clazz, new GZIPCompressionCodec()); + } + + public JsonSerializingTranscoder(int max, JavaType javaType) { + this(max, javaType, new GZIPCompressionCodec()); + } + + public JsonSerializingTranscoder(int max, Class clazz, CompressionCodecIF codec) { this.maxSize = max; this.objectMapper = new ObjectMapper(); this.javaType = objectMapper.getTypeFactory().constructType(clazz); - this.cu = new CompressionUtils(); + this.compressionCodec = codec; this.tu = new TranscoderUtils(true); } - public JsonSerializingTranscoder(int max, JavaType javaType) { + public JsonSerializingTranscoder(int max, JavaType javaType, CompressionCodecIF codec) { this.maxSize = max; this.objectMapper = new ObjectMapper(); this.javaType = Objects.requireNonNull(javaType, "JavaType must not be null"); - this.cu = new CompressionUtils(); + this.compressionCodec = codec; this.tu = new TranscoderUtils(true); } @@ -88,7 +98,7 @@ public int getMaxSize() { * @param threshold the number of bytes */ public void setCompressionThreshold(int threshold) { - cu.setCompressionThreshold(threshold); + this.compressionCodec.setCompressionThreshold(threshold); } /** @@ -110,7 +120,7 @@ public T decode(CachedData d) { } if ((d.getFlags() & COMPRESSED) != 0) { - data = cu.decompress(data); + data = compressionCodec.decompress(data); } Object rv = null; @@ -192,8 +202,8 @@ public CachedData encode(T o) { flags |= SERIALIZED; } assert b != null; - if (cu.isCompressionCandidate(b)) { - byte[] compressed = cu.compress(b); + if (compressionCodec.isCompressionCandidate(b)) { + byte[] compressed = compressionCodec.compress(b); if (compressed.length < b.length) { getLogger().debug("Compressed %s from %d to %d", o.getClass().getName(), b.length, compressed.length); diff --git a/src/main/java/net/spy/memcached/transcoders/SerializingTranscoder.java b/src/main/java/net/spy/memcached/transcoders/SerializingTranscoder.java index c124443ee..fe99f0631 100644 --- a/src/main/java/net/spy/memcached/transcoders/SerializingTranscoder.java +++ b/src/main/java/net/spy/memcached/transcoders/SerializingTranscoder.java @@ -21,6 +21,8 @@ import java.util.Date; import net.spy.memcached.CachedData; +import net.spy.memcached.transcoders.compression.CompressionCodecIF; +import net.spy.memcached.transcoders.compression.GZIPCompressionCodec; import static net.spy.memcached.transcoders.TranscoderUtils.COMPRESSED; import static net.spy.memcached.transcoders.TranscoderUtils.SERIALIZED; @@ -54,15 +56,20 @@ public class SerializingTranscoder extends BaseSerializingTranscoder * Get a serializing transcoder with the default max data size. */ public SerializingTranscoder() { - this(CachedData.MAX_SIZE, null, false, false); + this(CachedData.MAX_SIZE, null, false, false, new GZIPCompressionCodec()); } public SerializingTranscoder(int max) { - this(max, null, false, false); + this(max, null, false, false, new GZIPCompressionCodec()); } public SerializingTranscoder(int max, ClassLoader cl) { - this(max, cl, false, false); + this(max, cl, false, false, new GZIPCompressionCodec()); + } + + public SerializingTranscoder(int max, ClassLoader cl, boolean isCollection, + boolean forceJDKSerializeForCollection) { + this(max, cl, isCollection, forceJDKSerializeForCollection, new GZIPCompressionCodec()); } /** @@ -71,8 +78,9 @@ public SerializingTranscoder(int max, ClassLoader cl) { * or Builder for custom configurations. */ protected SerializingTranscoder(int max, ClassLoader cl, boolean isCollection, - boolean forceJDKSerializeForCollection) { - super(max, cl); + boolean forceJDKSerializeForCollection, + CompressionCodecIF codec) { + super(max, cl, true, codec); this.isCollection = isCollection; this.forceJDKSerializeForCollection = forceJDKSerializeForCollection; } @@ -207,6 +215,7 @@ public static final class Builder { private ClassLoader cl; private boolean isCollection; private boolean forceJDKSerializeForCollection; + private CompressionCodecIF compressionCodec = new GZIPCompressionCodec(); private Builder() {} @@ -236,6 +245,11 @@ public Builder classLoader(ClassLoader cl) { return this; } + public Builder compressionCodec(CompressionCodecIF codec) { + this.compressionCodec = codec; + return this; + } + /** * By default, this transcoder uses Java serialization only if the type is a user-defined class. * This mechanism may cause malfunction if you store Object type values @@ -255,7 +269,8 @@ public Builder forceJDKSerializationForCollection() { } public SerializingTranscoder build() { - return new SerializingTranscoder(max, cl, isCollection, forceJDKSerializeForCollection); + return new SerializingTranscoder(max, cl, isCollection, + forceJDKSerializeForCollection, compressionCodec); } } } diff --git a/src/main/java/net/spy/memcached/transcoders/compression/CompressionCodecIF.java b/src/main/java/net/spy/memcached/transcoders/compression/CompressionCodecIF.java new file mode 100644 index 000000000..8034bc1df --- /dev/null +++ b/src/main/java/net/spy/memcached/transcoders/compression/CompressionCodecIF.java @@ -0,0 +1,12 @@ +package net.spy.memcached.transcoders.compression; + +public interface CompressionCodecIF { + + byte[] compress(byte[] data); + + byte[] decompress(byte[] data); + + boolean isCompressionCandidate(byte[] data); + + void setCompressionThreshold(int threshold); +} diff --git a/src/main/java/net/spy/memcached/transcoders/compression/GZIPCompressionCodec.java b/src/main/java/net/spy/memcached/transcoders/compression/GZIPCompressionCodec.java new file mode 100644 index 000000000..1252247de --- /dev/null +++ b/src/main/java/net/spy/memcached/transcoders/compression/GZIPCompressionCodec.java @@ -0,0 +1,69 @@ +package net.spy.memcached.transcoders.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import net.spy.memcached.compat.SpyObject; + + +public class GZIPCompressionCodec extends SpyObject implements CompressionCodecIF { + + private static final int DECOMPRESSION_BUF_SIZE = 8192; + + private volatile int compressionThreshold = 16_384; + + @Override + public byte[] compress(byte[] in) { + if (in == null) { + throw new NullPointerException("Can't compress null"); + } + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try (GZIPOutputStream gz = new GZIPOutputStream(bos)) { + gz.write(in); + } catch (IOException e) { + throw new RuntimeException("IO exception compressing data", e); + } + + byte[] result = bos.toByteArray(); + getLogger().debug("Compressed %d bytes to %d", in.length, result.length); + return result; + } + + @Override + public byte[] decompress(byte[] in) { + if (in == null) { + return null; + } + + try (ByteArrayInputStream bis = new ByteArrayInputStream(in); + GZIPInputStream gis = new GZIPInputStream(bis); + ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + + byte[] buf = new byte[DECOMPRESSION_BUF_SIZE]; + int read; + while ((read = gis.read(buf)) > 0) { + bos.write(buf, 0, read); + } + + return bos.toByteArray(); + } catch (IOException e) { + getLogger().warn("Failed to decompress data", e); + return null; + } + } + + @Override + public boolean isCompressionCandidate(byte[] data) { + return data != null && data.length > compressionThreshold; + } + + @Override + public void setCompressionThreshold(int threshold) { + this.compressionThreshold = threshold; + } + +} diff --git a/src/main/java/net/spy/memcached/transcoders/compression/SnappyCompressionCodec.java b/src/main/java/net/spy/memcached/transcoders/compression/SnappyCompressionCodec.java new file mode 100644 index 000000000..3b80d196d --- /dev/null +++ b/src/main/java/net/spy/memcached/transcoders/compression/SnappyCompressionCodec.java @@ -0,0 +1,55 @@ +package net.spy.memcached.transcoders.compression; + + +import java.io.IOException; + +import net.spy.memcached.compat.SpyObject; + +import org.xerial.snappy.Snappy; + +public class SnappyCompressionCodec extends SpyObject implements CompressionCodecIF { + + private volatile int compressionThreshold = 16_384; + + @Override + public byte[] compress(byte[] data) { + if (data == null) { + throw new NullPointerException("Can't compress null"); + } + + try { + byte[] compressed = Snappy.compress(data); + getLogger().debug("Compressed %d bytes to %d", data.length, compressed.length); + return compressed; + } catch (IOException e) { + throw new RuntimeException("IO exception compressing data", e); + } + } + + @Override + public byte[] decompress(byte[] data) { + if (data == null) { + return null; + } + + try { + byte[] decompress = Snappy.uncompress(data); + getLogger().debug("Decompressed %d bytes to %d", data.length, decompress.length); + return decompress; + } catch (IOException e) { + getLogger().warn("Failed to decompress data, returning null", e); + return null; + } + } + + @Override + public boolean isCompressionCandidate(byte[] data) { + return data != null && data.length > compressionThreshold; + } + + @Override + public void setCompressionThreshold(int threshold) { + this.compressionThreshold = threshold; + } + +} diff --git a/src/test/java/net/spy/memcached/transcoders/compression/CompressionCodecTest.java b/src/test/java/net/spy/memcached/transcoders/compression/CompressionCodecTest.java new file mode 100644 index 000000000..a1a3a1259 --- /dev/null +++ b/src/test/java/net/spy/memcached/transcoders/compression/CompressionCodecTest.java @@ -0,0 +1,118 @@ +package net.spy.memcached.transcoders.compression; + +import java.nio.charset.StandardCharsets; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class CompressionCodecTest { + + private static final byte[] LARGE_DATA = + repeat("hello arcus ", 2000).getBytes(StandardCharsets.UTF_8); + + + private static String repeat(String s, int times) { + StringBuilder sb = new StringBuilder(s.length() * times); + for (int i = 0; i < times; i++) { + sb.append(s); + } + return sb.toString(); + } + + // ---- GZIP ---- + + @Test + void GZIPCompressAndDecompress() { + GZIPCompressionCodec codec = new GZIPCompressionCodec(); + byte[] compressed = codec.compress(LARGE_DATA); + byte[] decompressed = codec.decompress(compressed); + + assertNotNull(compressed); + assertTrue(compressed.length < LARGE_DATA.length); + assertArrayEquals(LARGE_DATA, decompressed); + } + + @Test + void GZIPCompressNullThrows() { + assertThrows(NullPointerException.class, () -> new GZIPCompressionCodec().compress(null)); + } + + @Test + void GZIPDecompressNullReturnsNull() { + assertNull(new GZIPCompressionCodec().decompress(null)); + } + + @Test + void GZIPDecompressInvalidDataReturnsNull() { + byte[] invalid = "not gzip data".getBytes(StandardCharsets.UTF_8); + assertNull(new GZIPCompressionCodec().decompress(invalid)); + } + + @Test + void GZIPIsCompressionCandidateDefaultThreshold() { + GZIPCompressionCodec codec = new GZIPCompressionCodec(); + assertFalse(codec.isCompressionCandidate(new byte[100])); + assertTrue(codec.isCompressionCandidate(new byte[16385])); + assertFalse(codec.isCompressionCandidate(null)); + } + + @Test + void GZIPSetCompressionThreshold() { + GZIPCompressionCodec codec = new GZIPCompressionCodec(); + codec.setCompressionThreshold(100); + assertFalse(codec.isCompressionCandidate(new byte[50])); + assertTrue(codec.isCompressionCandidate(new byte[101])); + } + + // ---- Snappy ---- + + @Test + void SnappyCompressAndDecompress() { + SnappyCompressionCodec codec = new SnappyCompressionCodec(); + byte[] compressed = codec.compress(LARGE_DATA); + byte[] decompressed = codec.decompress(compressed); + + assertNotNull(compressed); + assertTrue(compressed.length < LARGE_DATA.length, "압축 후 크기가 원본보다 작아야 한다"); + assertArrayEquals(LARGE_DATA, decompressed); + } + + @Test + void SnappyCompressNullThrows() { + assertThrows(NullPointerException.class, () -> new SnappyCompressionCodec().compress(null)); + } + + @Test + void SnappyDecompressNullReturnsNull() { + assertNull(new SnappyCompressionCodec().decompress(null)); + } + + @Test + void SnappyDecompressInvalidDataReturnsNull() { + byte[] invalid = "not snappy data".getBytes(StandardCharsets.UTF_8); + assertNull(new SnappyCompressionCodec().decompress(invalid)); + } + + @Test + void SnappyIsCompressionCandidateDefaultThreshold() { + SnappyCompressionCodec codec = new SnappyCompressionCodec(); + assertFalse(codec.isCompressionCandidate(new byte[100])); + assertTrue(codec.isCompressionCandidate(new byte[16385])); + assertFalse(codec.isCompressionCandidate(null)); + } + + @Test + void SnappySetCompressionThreshold() { + SnappyCompressionCodec codec = new SnappyCompressionCodec(); + codec.setCompressionThreshold(100); + assertFalse(codec.isCompressionCandidate(new byte[50])); + assertTrue(codec.isCompressionCandidate(new byte[101])); + } + +}