Source code for xz_mod

#!/usr/bin/env python

# pylint: disable=W0404,F0401,simplifiable-if-statement,superfluous-parens
# W0404: pylint -thinks- we're reimporting ctypes.util, but we aren't really
# F0401: We can't always import lzma, but don't stress about that - it only exists in 3.3
# simplifiable-if-statement: We use module-level variables to tell us what's importing and what's not
# superfluous-parens: Parentheses are good for portability and clarity

"""Impersonate the real xz module, which has yet to be written and/or added to CPython, by using the xz binary."""

import os
import tempfile
import subprocess

try:
    import ctypes
    import ctypes.util
except ImportError:
    HAVE_CTYPES = False
else:
    HAVE_CTYPES = True

try:
    import lzma
except ImportError:
    try:
        # Sometimes Pypy 2.x will have this, perhaps CPython too.  It's a CFFI lzma module.
        import lzmaffi as lzma
    except ImportError:
        HAVE_LZMA = False
    else:
        HAVE_LZMA = True
else:
    HAVE_LZMA = True


[docs]class DecompressionError(Exception): """An exception to raise if there is a problem decompressing data. Should be rare but not nonexistent in practice.""" # Currently only used by MEANS = 'lzma module' pass
if hasattr(os, 'uname'): FUNCTION = getattr(os, 'uname') UNAME = FUNCTION()[0] else: # This is probably Jython, but Unknown is good enough for now, and more likely to be accurate in the future # as the code adapts to new requirements. We really only care about Cygwin for now, and that has an os.uname(). UNAME = 'Unknown' # This module does the same thing: compression/decompression, in two different ways: # 1) The normal way, via the subprocess module (which has problems sometimes on Cygwin, because Cygwin merely # emulates fork, and does so a bit poorly: Read "slowly, and unreliably". This variant uses no temporary # files. # 2) Via os.popen, which has fallen into disfavor, but seems to work better on Windows. This variant uses # one temporary file for every compress, and one temporary file for every decompress. # Other possibilities: # 3) Use the multiprocessing module - that purporetedly works fine on Windows, but it isn't included in Python # until 2.6 # 4) Write a ctypes-based module - this chances having problens on PyPy, given my experience with gdbm_ctypes # 5) Use a cython module - again, PyPy issues # 6) Use a C extension module - again, PyPy issues # Note that neither the generic version of the Cygwin version is especially fast in this version of xz_mod, though # the generic version is faster than the Cygwin version.
[docs]def find_xz(): """Find the xz binary, if any.""" path = os.environ['PATH'] directories = path.split(':') for directory in directories: candidate_executable = os.path.join(directory, 'xz') if os.access(candidate_executable, os.X_OK): return candidate_executable return None
def _find_lib(library): """Find our libraries, one at a time.""" normal_result = ctypes.util.find_library(library) if normal_result is None: # find_library doesn't appear to be able to find cygwin DLL's, so # we have extra code to help with that - in part because fork+exec # is very slow on windows, so we really want the ctypes version # of xz there. for dll_vers in range(11, 1, -1): candidate_filename = '/bin/cyg%s-%d.dll' % (library, dll_vers) if os.path.isfile(candidate_filename): return candidate_filename # This is for Haiku for so_vers in range(11, 1, -1): candidate_filename = '/boot/common/lib/lib%s.so.%s' % (library, so_vers) print('checking %s' % (candidate_filename,)) if os.path.isfile(candidate_filename): return candidate_filename # Didn't find library return None return normal_result XZ_PATH = find_xz()
[docs]class Xz_ctypes(object): """A class to compress and decompress xz format, using ctypes to access liblzma.""" LZMA_OK = 0 LZMA_TELL_NO_CHECK = 1 LZMA_CHECK_CRC32 = 1 LZMA_NO_CHECK = 2 LZMA_TELL_UNSUPPORTED_CHECK = 2 LZMA_UNSUPPORTED_CHECK = 3 LZMA_MEM_ERROR = 5 LZMA_MEMLIMIT_ERROR = 6 LZMA_PRESET_DEFAULT = 6 LZMA_FORMAT_ERROR = 7 LZMA_OPTIONS_ERROR = 8 LZMA_DATA_ERROR = 9 LZMA_BUF_ERROR = 10 LZMA_PROG_ERROR = 11 long_message = 'LZMA_MEMLIMIT_ERROR: Memory usage limit was reached. ' + \ 'minimum required memlimit value was stored to *memlimit' long_string = "LZMA_FORMAT_ERROR: Magic bytes don't match, thus the given buffer cannot be Stream Header." error_dict = { LZMA_PRESET_DEFAULT: 'LZMA_PRESET_DEFAULT', LZMA_CHECK_CRC32: 'LZMA_CHECK_CRC32', LZMA_OK: 'LZMA_OK', LZMA_FORMAT_ERROR: long_string, LZMA_OPTIONS_ERROR: 'LZMA_OPTIONS_ERROR', LZMA_DATA_ERROR: 'LZMA_DATA_ERROR', LZMA_NO_CHECK: 'LZMA_NO_CHECK', LZMA_TELL_NO_CHECK: 'LZMA_TELL_NO_CHECK', LZMA_UNSUPPORTED_CHECK: 'LZMA_UNSUPPORTED_CHECK', LZMA_TELL_UNSUPPORTED_CHECK: 'LZMA_TELL_UNSUPPORTED_CHECK', LZMA_MEM_ERROR: 'LZMA_MEM_ERROR', LZMA_MEMLIMIT_ERROR: long_message, LZMA_BUF_ERROR: 'LZMA_BUF_ERROR: Output buffer was too small', LZMA_PROG_ERROR: 'LZMA_PROG_ERROR', } funcs = {} def __init__(self): """Initialize.""" pass
[docs] @classmethod def declare_c_function(cls, library, name, argtypes=None, restype=None): """Extract functions from liblzma.""" try: func = getattr(library, '_%s' % name) cls.funcs[name] = func except AttributeError: func = getattr(library, name) cls.funcs[name] = func if argtypes is not None: func.argtypes = argtypes if restype is not None: func.restype = restype
[docs] @classmethod def class_init(cls): """Initialize the class.""" cls.LZMA_LIBPATH = _find_lib('lzma') if cls.LZMA_LIBPATH is None: raise ImportError('Could not find lzma library') # *ix way - windows is different cls.LZMA_LIB = ctypes.CDLL(cls.LZMA_LIBPATH) cls.declare_c_function( cls.LZMA_LIB, 'lzma_stream_buffer_bound', (ctypes.c_size_t, ), ctypes.c_size_t, ) # lzma_easy_buffer_encode notes: # Argument 1, uint32_t preset: # Just a uint32 - simple # Argument 2, lzma_check check: # An enum to the C programmer - that should usually be an unsigned integer in the C runtime. # Argument 3, lzma_allocator *lzma_allocator: # This is really a pointer to a struct, but happily, we only need to pass NULL to it, # so we can just treat it as a void * # Argument 4: uint8_t *in: # Argument 5: size_t in_size: # Argument 6: uint8_t *out # Argument 7: size_t *out_pos # Argument 8: size_t out_size # # The return type is also an enum, so probably an unsigned int cls.declare_c_function( cls.LZMA_LIB, 'lzma_easy_buffer_encode', ( ctypes.c_uint32, # preset ctypes.c_uint, # check ctypes.c_void_p, # lzma_allocator ctypes.POINTER(ctypes.c_uint8), # uint8_t *in ctypes.c_size_t, # size_t in_size ctypes.POINTER(ctypes.c_uint8), # uint8_t *out ctypes.POINTER(ctypes.c_size_t), # size_t *out_pos ctypes.c_size_t, # size_t out_size ), ctypes.c_uint, ) cls.declare_c_function( cls.LZMA_LIB, 'lzma_stream_buffer_decode', ( ctypes.POINTER(ctypes.c_uint64), # uint64_t *memlimit ctypes.c_uint32, # uint32_t flags ctypes.c_void_p, # lzma_allocator *allocator, ctypes.POINTER(ctypes.c_uint8), # const uint8_t *in ctypes.POINTER(ctypes.c_size_t), # size_t *in_pos ctypes.c_size_t, # size_t in_size, ctypes.POINTER(ctypes.c_uint8), # uint8_t *out ctypes.POINTER(ctypes.c_size_t), # size_t *out_pos ctypes.c_size_t, # size_t out_size ), ctypes.c_uint, )
[docs] @classmethod def get_xz_error(cls, ret_xz): """Decode an lzma_ret enum to an at-least-somewhat-descriptive string.""" if ret_xz in cls.error_dict: return cls.error_dict[ret_xz] return 'Unrecognized lzma_ret value: %d' % ret_xz
[docs] @classmethod def compress(cls, input_data): """Compress data into xz format using ctypes to access liblzma.so.""" # maximum_size = lzma_stream_buffer_bound(input_buffer_size); length_input_data = len(input_data) maximum_size = cls.funcs['lzma_stream_buffer_bound'](length_input_data) # This is an efficient way of creating a readonly ctypes string ctypes_input_data_char_p = ctypes.c_char_p(input_data) ctypes_input_data = ctypes.cast(ctypes_input_data_char_p, ctypes.POINTER(ctypes.c_ubyte)) # Here is a less-fast but mutable way of creating a ctypes string. # This works most of the time, but pypy2 5.10.0 has problems with it: # ctypes_compressed_buffer_char_p = ctypes.create_string_buffer(maximum_size) # This is a viable alternative: ctypes_compressed_buffer_char_p = ctypes.create_string_buffer(b'\0' * maximum_size) ctypes_compressed_buffer = ctypes.cast(ctypes_compressed_buffer_char_p, ctypes.POINTER(ctypes.c_ubyte)) ctypes_compressed_size = ctypes.c_size_t(0) ctypes_compressed_size_pointer = ctypes.cast(ctypes.addressof(ctypes_compressed_size), ctypes.POINTER(ctypes.c_size_t)) # lzma_easy_buffer_encode ret_xz = cls.funcs['lzma_easy_buffer_encode']( cls.LZMA_PRESET_DEFAULT, cls.LZMA_CHECK_CRC32, None, ctypes_input_data, length_input_data, ctypes_compressed_buffer, ctypes_compressed_size_pointer, maximum_size, ) if ret_xz != cls.LZMA_OK: raise OSError(cls.get_xz_error(ret_xz)) resultant_length = int(ctypes_compressed_size.value) result = ctypes_compressed_buffer_char_p.raw[:resultant_length] return result
[docs] @classmethod def decompress(cls, input_data, max_result_size=2 ** 26): # pylint: disable=R0914 # R0914: We need some locals for this one """Uncompress data from xz format using ctypes to access liblzma.so.""" ctypes_memlimit = ctypes.c_uint64(max_result_size) ctypes_memlimit_pointer = ctypes.cast(ctypes.addressof(ctypes_memlimit), ctypes.POINTER(ctypes.c_uint64)) ctypes_input_data_char_p = ctypes.c_char_p(input_data) ctypes_input_data = ctypes.cast(ctypes_input_data_char_p, ctypes.POINTER(ctypes.c_ubyte)) ctypes_in_pos = ctypes.c_size_t(0) ctypes_in_pos_pointer = ctypes.cast(ctypes.addressof(ctypes_in_pos), ctypes.POINTER(ctypes.c_size_t)) in_size = ctypes.c_size_t(len(input_data)) ctypes_uncomp_buffer_char_p = ctypes.create_string_buffer(max_result_size) ctypes_uncompressed_buffer = ctypes.cast(ctypes_uncomp_buffer_char_p, ctypes.POINTER(ctypes.c_ubyte)) ctypes_out_pos = ctypes.c_size_t(0) ctypes_out_pos_pointer = ctypes.cast(ctypes.addressof(ctypes_out_pos), ctypes.POINTER(ctypes.c_size_t)) out_size = ctypes.c_size_t(max_result_size) ret_xz = cls.funcs['lzma_stream_buffer_decode']( ctypes_memlimit_pointer, # uint64_t *memlimit cls.LZMA_TELL_NO_CHECK, # uint32_t flags None, # lzma_allocator *allocator, ctypes_input_data, # const uint8_t *in ctypes_in_pos_pointer, # size_t *in_pos in_size, # size_t in_size, ctypes_uncompressed_buffer, # uint8_t *out ctypes_out_pos_pointer, # size_t *out_pos out_size, # size_t out_size ) if ret_xz != cls.LZMA_OK: raise OSError(cls.get_xz_error(ret_xz)) resultant_length = int(ctypes_out_pos.value) result = ctypes_uncomp_buffer_char_p.raw[:resultant_length] return result
if HAVE_CTYPES: XZ_CTYPES = Xz_ctypes() XZ_CTYPES.class_init()
[docs]def use_lzma(): """Return True if we should use the lzma module.""" if HAVE_LZMA and hasattr(lzma, 'FORMAT_XZ'): return True return False
[docs]def use_ctypes(): """Return True if we should use the ctypes version.""" if HAVE_CTYPES and _find_lib('lzma') is not None: return True return False
[docs]def use_popen(): """Return True if we should use the popen version.""" if UNAME.startswith('CYGWIN'): return True return False
[docs]def use_subprocess(): """Return True if we should use the subprocess module.""" if XZ_PATH is not None: return True return False
if use_lzma(): MEANS = 'lzma module' def compress(data): # pylint: disable=no-member # Some python's don't have these """Compress a block of data using lzma/xz.""" return lzma.compress(data, format=lzma.FORMAT_XZ, preset=lzma.PRESET_DEFAULT, check=lzma.CHECK_CRC32) def decompress(data): # pylint: disable=no-member # Some python's don't have these """Decompress a block of data using lzma/xz.""" try: return lzma.decompress(data, format=lzma.FORMAT_XZ) except lzma.LZMAError: raise DecompressionError elif use_popen(): MEANS = 'Windows popen' def compress(data): """Compress data using an xz executable and the popen function (to avoid fork on Windows). Uses temporary files.""" if XZ_PATH is None: raise OSError('xz not found') args = {} args['mode'] = 'w+b' args['suffix'] = '.backshifttemp' args['prefix'] = 'tmp' args['delete'] = False temp_file = tempfile.NamedTemporaryFile(**args) temp_filename = temp_file.name temp_file.write(data) temp_file.close() pipe = os.popen('%s -z < %s' % (XZ_PATH, temp_filename), 'rb') result = pipe.read() retval = pipe.close() if retval is None: retval = 0 exit_code = retval / 256 if exit_code: raise OSError('compress failed') os.unlink(temp_filename) return result def decompress(data): """Decompress data using an xz executable and the popen function (to avoid fork on Windows). Uses temporary files.""" if XZ_PATH is None: raise OSError('xz not found') args = {} args['mode'] = 'w+b' args['suffix'] = '.backshiftemp' args['prefix'] = 'tmp' args['delete'] = False temp_file = tempfile.NamedTemporaryFile(**args) temp_filename = temp_file.name temp_file.write(data) temp_file.close() pipe = os.popen('%s -d < %s' % (XZ_PATH, temp_filename), 'rb') result = pipe.read() retval = pipe.close() if retval is None: retval = 0 exit_code = retval / 256 if exit_code: raise OSError('decompress failed') os.unlink(temp_filename) return result elif use_ctypes(): MEANS = 'ctypes' def compress(data): """Compress using ctypes - just hand off to XZ_CTYPES.""" return XZ_CTYPES.compress(data) def decompress(data): """Decompress using ctypes - just hand off to XZ_CTYPES.""" return XZ_CTYPES.decompress(data) elif use_subprocess(): MEANS = 'xz subprocess'
[docs] def compress(data): # pylint: disable=E1101 # E1101: actually Popen objects do have a returncode member """Compress data using an xz executable and the subprocess module.""" if XZ_PATH is None: raise OSError('xz not found') subp = subprocess.Popen([XZ_PATH, "-z"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) stdout, stderr = subp.communicate(input=data) if subp.returncode != 0: raise OSError('Chunk failed to compress: %s' % stderr) return stdout
[docs] def decompress(data): # pylint: disable=E1101 # E1101: actually Popen objects do have a returncode member """Decompress data using an xz executable and the subprocess module.""" if XZ_PATH is None: raise OSError('xz not found') subp = subprocess.Popen([XZ_PATH, "-d"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) stdout, stderr = subp.communicate(input=data) if subp.returncode != 0: raise OSError('Chunk failed to decompress: %s' % stderr) return stdout
else: raise ValueError('No suitable form of xz compression found')