Source code for compressed_string_mod
#!/usr/bin/python
"""Provides functions for compressing and decompressing strings, all in memory."""
# 0: Unused
# 1: Not compressed
# 2: Compressed with pack: .z
# 3: Compressed with compress: .Z
# 4: Compressed with gzip: .gz
# 5: Compressed with bzip2: .bz2 (quite a bit slower than gzip, but packs harder)
# 6: Compressed with xz -2: .xz (quite a bit faster than bz2, only a tiny bit larger)
# 7: Compressed with xz -6: .xz (packs harder than bzip2, but slower)
#
# Brotli may prove worth adding:
# http://tech.slashdot.org/story/15/09/22/1723219/google-launches-brotli-a-new-open-source-compression-algorithm-for-the-web?utm_source=rss1.0mainlinkanon&utm_medium=feed
#
# At this time, only 1, 5 and 7 are used
import sys
try:
import bz2 as bz2_mod
except ImportError:
HAVE_BZ2 = False
else:
HAVE_BZ2 = True
import constants_mod
import helpers
import stringio
import xz_mod
[docs]def compress_string(data, bz2_announced=False):
"""
Compress a string.
1) If we have xz, try to use it.
2) Fallback: if we have bz2, try to use that.
3) Else save without compressing.
4) Or if the compressed version is larger, save without compressing then too.
"""
try:
compressed_data = xz_mod.compress(data)
compression_type = 7
compressed_ok = True
except OSError:
if HAVE_BZ2:
if not bz2_announced:
sys.stderr.write('%s: warning: falling back to bzip2 compression due to lack of xz\n' % sys.argv[0])
bz2_announced = True
compressed_data = bz2_mod.compress(data)
compression_type = 5
compressed_ok = True
else:
compressed_ok = False
if not compressed_ok or len(data) < len(compressed_data):
# Many strings don't compress - they instead get larger. So we catch that and allow them to increase
# by 2 bytes only.
result = constants_mod.Constants.b_onenewline + data
else:
result = \
helpers.string_to_binary('%d' % compression_type) + \
constants_mod.Constants.b_newline + \
compressed_data
# print('compress_string: len(result): %s' % len(result))
# print('compress_string: result[:10]: %s' % result[:10])
binary_result = helpers.string_to_binary(result)
# print('compress_string: len(binary_result): %s' % len(binary_result))
return binary_result
[docs]def decompress_string(compressed_data, zero_length_ok=False):
"""Uncompress a string."""
# print('decompress_string: compressed_data: %d' % len(compressed_data))
memory_file = stringio.StringIO(compressed_data)
compression_type = memory_file.readline().rstrip()
remainder = memory_file.read()
# print('decompress_string: remainder: %d' % len(remainder))
if compression_type == constants_mod.Constants.b_7:
decompressed_data = xz_mod.decompress(remainder)
elif compression_type == constants_mod.Constants.b_5:
decompressed_data = bz2_mod.decompress(remainder)
elif compression_type == constants_mod.Constants.b_1:
decompressed_data = remainder
elif zero_length_ok and compression_type == helpers.empty_bytes:
decompressed_data = helpers.empty_bytes
else:
raise ValueError('Did not get a valid compression type from compressed_data')
return decompressed_data