11import zlib
2-
3- from reencode import reencode
2+ from dataclasses import dataclass
43
54import deflate
65import zopfli .zlib
76
7+ from reencode import Huffman , lz77 , reencode
8+
9+
10+ @dataclass (frozen = True )
11+ class CompressionInfo :
12+ method : str
13+ window : int
14+ delimiter : str
15+ reencode : bool
816
9- ZOPFLI_ITERS = [15 , 128 ]
10- LIBDEFLATE_LEVELS = [11 , 12 ]
11- ZLIB_LEVELS = [9 ]
12- DELIMS = [b"'" , b'"' ]
13- WINDOWS = [- 9 , - 10 ]
1417
18+ ZOPFLI_ITERS : list [int ] = [15 , 128 ]
19+ LIBDEFLATE_LEVELS : list [int ] = [11 , 12 ]
20+ ZLIB_LEVELS : list [int ] = [9 ]
21+ DELIMS : list [bytes ] = [b"'" , b'"' ]
22+ WINDOWS : list [int ] = [- 9 , - 10 ]
1523
16- def sanitize (b_in : bytes , delim : bytes , use_reencode : bool = True ) -> bytes :
17- if use_reencode :
18- b_in = reencode (b_in , delim )
24+
25+ def _hoist_import (src : bytes ) -> tuple [bytes , bytes ]:
26+ if src .startswith (b"import" ):
27+ module = src .split ()[1 ]
28+ return src [len (module ) + 8 :], b"," + module
29+ return src , b""
30+
31+
32+ def _sanitize (b_in : bytes , delim : bytes ) -> bytes :
1933 b_out = bytearray ()
2034 for b , b_next in zip (b_in , [* b_in [1 :], 0 ]):
2135 if b == 0 :
@@ -33,74 +47,103 @@ def sanitize(b_in: bytes, delim: bytes, use_reencode: bool = True) -> bytes:
3347 return bytes (b_out )
3448
3549
36- def compress (src : bytes ) -> tuple [bytes , dict ]:
37- candidates : list [tuple [bytes , dict ]] = []
38-
39- # import hoisting: "import zlib,re"
40- hoisted_import = b""
41- if src .startswith (b"import" ):
42- module = src .split ()[1 ]
43- hoisted_import = b"," + module
44- src = src [len (module ) + 8 :]
45-
50+ def _wrap (deflate_data : bytes , delim : bytes , hoisted : bytes , window : int ) -> bytes :
51+ sanitized = _sanitize (reencode (deflate_data , delim ), delim )
52+ window_str = b",~9" if window == - 10 else (b",%d" % window if window != 15 else b"" )
53+ return (
54+ b"#coding:L1\n import zlib"
55+ + hoisted
56+ + b"\n exec(zlib.decompress(bytes("
57+ + delim
58+ + sanitized
59+ + delim
60+ + b',"L1")'
61+ + window_str
62+ + b"))"
63+ )
64+
65+
66+ def compress (src : bytes ) -> tuple [bytes , CompressionInfo ]:
67+ src , hoisted = _hoist_import (src )
4668 compressed_data : list [tuple [bytes , str , int ]] = []
4769
4870 for iters in ZOPFLI_ITERS :
49- full_result = zopfli .zlib .compress (
50- src , numiterations = iters , blocksplitting = False
71+ full : bytes = zopfli .zlib .compress (
72+ src ,
73+ numiterations = iters ,
74+ blocksplitting = False ,
5175 )
52- result = full_result [2 :- 4 ]
53- actual_window = - (((full_result [0 ] >> 4 ) & 0x0F ) + 8 )
54-
76+ result = full [2 :- 4 ]
77+ actual_window = - (((full [0 ] >> 4 ) & 0x0F ) + 8 )
5578 compressed_data .append ((result , f"zopfli(iters={ iters } )" , - 10 ))
5679 if actual_window != - 10 :
57- output_window = - 9 if actual_window < 15 else actual_window
58- compressed_data .append ((result , f"zopfli(iters={ iters } )" , output_window ))
59-
60- for level in LIBDEFLATE_LEVELS :
61- result = bytes (deflate .deflate_compress (src , compresslevel = level ))
62- compressed_data .append ((result , f"libdeflate(level={ level } )" , - 10 ))
80+ compressed_data .append (
81+ (
82+ result ,
83+ f"zopfli(iters={ iters } )" ,
84+ - 9 if actual_window < 15 else actual_window ,
85+ ),
86+ )
87+
88+ compressed_data .extend (
89+ (
90+ bytes (deflate .deflate_compress (src , compresslevel = level )),
91+ f"libdeflate(level={ level } )" ,
92+ - 10 ,
93+ )
94+ for level in LIBDEFLATE_LEVELS
95+ )
6396
6497 for level in ZLIB_LEVELS :
6598 for window in WINDOWS :
66- if window == - 10 :
67- result = zlib .compress (src , level = level , wbits = - 15 )[:]
68- else :
69- result = zlib .compress (src , level = level , wbits = window )[:]
99+ result = zlib .compress (
100+ src ,
101+ level = level ,
102+ wbits = - 15 if window == - 10 else window ,
103+ )
70104 compressed_data .append ((result , f"zlib(level={ level } )" , window ))
71105
106+ candidates : list [tuple [bytes , CompressionInfo ]] = []
72107 for data , method , window in compressed_data :
73108 for delim in DELIMS :
74109 for use_reencode in [True , False ]:
75- sanitized = sanitize (data , delim , use_reencode = use_reencode )
76- literal = delim + sanitized + delim
77-
110+ sanitized = _sanitize (
111+ reencode (data , delim ) if use_reencode else data ,
112+ delim ,
113+ )
78114 window_str = (
79115 b",~9"
80116 if window == - 10
81117 else (b",%d" % window if window != 15 else b"" )
82118 )
83-
84119 code = (
85120 b"#coding:L1\n import zlib"
86- + hoisted_import
121+ + hoisted
87122 + b"\n exec(zlib.decompress(bytes("
88- + literal
123+ + delim
124+ + sanitized
125+ + delim
89126 + b',"L1")'
90127 + window_str
91128 + b"))"
92129 )
93-
94130 candidates .append (
95131 (
96132 code ,
97- {
98- " method" : method ,
99- " window" : window ,
100- " delimiter" : delim .decode ("latin-1" ),
101- " reencode" : use_reencode ,
102- } ,
103- )
133+ CompressionInfo (
134+ method = method ,
135+ window = window ,
136+ delimiter = delim .decode (),
137+ reencode = use_reencode ,
138+ ) ,
139+ ),
104140 )
105141
106142 return min (candidates , key = lambda x : len (x [0 ]))
143+
144+
145+ def compress_frozen (src : bytes , huffman_hex : str ) -> bytes :
146+ src , hoisted = _hoist_import (src )
147+ huffman = Huffman .parse (bytes .fromhex (huffman_hex ))
148+ candidates = [_wrap (lz77 (src , huffman , d ), d , hoisted , - 10 ) for d in DELIMS ]
149+ return min (candidates , key = len )
0 commit comments