[bmap-tools] [PATCH 2/8] TransRead: re-write decompress logic

Artem Bityutskiy dedekind1 at gmail.com
Fri Jan 31 06:58:45 EST 2014


From: Artem Bityutskiy <artem.bityutskiy at intel.com>

This is a relatively big change which completerly re-writes the decompression
logic of this module. I did not split this change on many smaller changes,
since this is difficult to do, and I am trying to optimize my time usage. Yes,
bad explanation, but honest :-) But the diff is not that big anyway!

At the moment, TransRead tries to use various python tools for decompressing:
the 'bz2', 'tarfile', and 'zlib' modules. For 'xz', we are using the 'lzma'
module from backports, which is not present everywhere, e.g., OpenSuse does not
have it.

This worked relatively well, until I got these bug-reports and requrests:
Out-of-memory failure: https://bugs.tizen.org/jira/browse/TIVI-2388
pbzip2 support request: https://bugs.tizen.org/jira/browse/DEVT-141
lzo support request: https://bugs.tizen.org/jira/browse/DEVT-140

The first is very difficult to fix. We already pass only 128 bytes of data to
the bz2 decompressor. Makin it smaller would probably help, but would probably
fail on a system with even less memory. I tried to handle the MemoryError
exceptions, but the bz2 decompressor objects becomes unusable after the
MemoryError exception.

Then pbzip2 - the standard python 2.x 'bz2' library just does not support
multiple streams. I tried to use the 'bz2file' backport from python 3.x, but it
is really not very user-friendly, since users need to install it from PyPI.

Then 'lzo' support is absent in python 2.x. There is the 'python-lzo' package
in some distros providing the lzo functionality, but it is again, not available
in OpenSuse.

So I figured that this is too much of the trubles and tried to dump all the
decompression cruft and just use the standard Linux tools for that: bzip2,
gzip, xz, lzop, and tar. Just piple the data from the input file to the
decompressor program's stdin, and read the uncompressed data from its stdout.
And this worked perfectly. And became faster. And the out-of-memory problems
seemed to go away. And both pbzip2 and lzo became supported. And the amount of
code became less.

So I've just decided to go this way and this patch does exactly that.

This patch also adds several standard, but rarely used extensions: .tbz2, .tbz,
.tb2, and .txz, as aliases for 'tar.bz2' and 'tar.xz'.

Change-Id: I1bd1ae8a853744c3098189c08a9802e118a50580
Signed-off-by: Artem Bityutskiy <artem.bityutskiy at intel.com>
---
 bmaptools/TransRead.py | 424 ++++++++++++++++++++++---------------------------
 1 file changed, 193 insertions(+), 231 deletions(-)

diff --git a/bmaptools/TransRead.py b/bmaptools/TransRead.py
index 50b51e7..33ec0f4 100644
--- a/bmaptools/TransRead.py
+++ b/bmaptools/TransRead.py
@@ -12,16 +12,20 @@
 """
 This module allows opening and reading local and remote files and decompress
 them on-the-fly if needed. Remote files are read using urllib2 (except of
-"ssh://" URLs, which are handled differently). Supported compression types are:
-'bz2', 'gz', 'xz', 'tar.gz', 'tgz', 'tar.bz2', 'tar.xz'.
+"ssh://" URLs, which are handled differently). Supported file extentions are:
+'bz2', 'gz', 'xz', 'lzo' and a "tar" version of them: 'tar.bz2', 'tbz2', 'tbz',
+'tb2', 'tar.gz', 'tgz', 'tar.xz', 'txz', 'tar.lzo', 'tzo'. This module uses
+the following system programs for decompressing: pbzip2, bzip2, gzip, pigz, xz,
+lzop, and tar.
 """
 
 import os
 import errno
 import urlparse
 import logging
+import threading
 import subprocess
-import BmapHelpers
+from bmaptools import BmapHelpers
 
 # Disable the following pylint errors and recommendations:
 #   * Instance of X has no member Y (E1101), because it produces
@@ -30,14 +34,16 @@ import BmapHelpers
 #   * Too many instance attributes (R0902)
 #   * Too many branches (R0912)
 #   * Too many local variables (R0914)
+#   * Too many statements (R0915)
 # pylint: disable=E1101
 # pylint: disable=R0902
 # pylint: disable=R0912
 # pylint: disable=R0914
+# pylint: disable=R0915
 
 # A list of supported compression types
-SUPPORTED_COMPRESSION_TYPES = ('bz2', 'gz', 'xz', 'tar.gz', 'tgz', 'tar.bz2',
-                               'tar.xz')
+SUPPORTED_COMPRESSION_TYPES = ('bz2', 'gz', 'xz', 'lzo', 'tar.gz', 'tar.bz2',
+                               'tar.xz', 'tar.lzo')
 
 def _fake_seek_forward(file_obj, cur_pos, offset, whence=os.SEEK_SET):
     """
@@ -83,114 +89,6 @@ class Error(Exception):
     """
     pass
 
-class _CompressedFile(object):
-    """
-    This class implements transparent reading from a compressed file-like
-    object and decompressing its contents on-the-fly.
-    """
-
-    def __init__(self, file_obj, decompress_func=None, chunk_size=None):
-        """
-        Class constructor. The 'file_ojb' argument is the compressed file-like
-        object to read from. The 'decompress_func()' function is a function to
-        use for decompression.
-
-        The 'chunk_size' parameter may be used to limit the amount of data read
-        from the input file at a time and it is assumed to be used with
-        compressed files. This parameter has a big effect on the memory
-        consumption in case the input file is a compressed stream of all
-        zeroes. If we read a big chunk of such a compressed stream and
-        decompress it, the length of the decompressed buffer may be huge. For
-        example, when 'chunk_size' is 128KiB, the output buffer for a 4GiB .gz
-        file filled with all zeroes is about 31MiB. Bzip2 is more dangerous -
-        when 'chunk_size' is only 1KiB, the output buffer for a 4GiB .bz2 file
-        filled with all zeroes is about 424MiB and when 'chunk_size' is 128
-        bytes it is about 77MiB.
-        """
-
-        self._file_obj = file_obj
-        self._decompress_func = decompress_func
-        if chunk_size:
-            self._chunk_size = chunk_size
-        else:
-            self._chunk_size = 128 * 1024
-        self._pos = 0
-        self._buffer = ''
-        self._buffer_pos = 0
-        self._eof = False
-
-    def seek(self, offset, whence=os.SEEK_SET):
-        """The 'seek()' method, similar to the one file objects have."""
-        self._pos = _fake_seek_forward(self, self._pos, offset, whence)
-
-    def tell(self):
-        """The 'tell()' method, similar to the one file objects have."""
-        return self._pos
-
-    def _read_from_buffer(self, length):
-        """Read from the internal buffer."""
-        buffer_len = len(self._buffer)
-        if buffer_len - self._buffer_pos > length:
-            data = self._buffer[self._buffer_pos:self._buffer_pos + length]
-            self._buffer_pos += length
-        else:
-            data = self._buffer[self._buffer_pos:]
-            self._buffer = ''
-            self._buffer_pos = 0
-
-        return data
-
-    def read(self, size):
-        """
-        Read the compressed file, uncompress the data on-the-fly, and return
-        'size' bytes of the uncompressed data.
-        """
-
-        assert self._pos >= 0
-        assert self._buffer_pos >= 0
-        assert self._buffer_pos <= len(self._buffer)
-
-        if self._eof:
-            return ''
-
-        # Fetch the data from the buffers first
-        data = self._read_from_buffer(size)
-        size -= len(data)
-
-        # If the buffers did not contain all the requested data, read them,
-        # decompress, and buffer.
-
-        while size > 0:
-            buf = self._file_obj.read(self._chunk_size)
-            if not buf:
-                self._eof = True
-                break
-
-            buf = self._decompress_func(buf)
-            if not buf:
-                continue
-
-            assert len(self._buffer) == 0
-            assert self._buffer_pos == 0
-
-            # The decompressor may return more data than we requested. Save the
-            # extra data in an internal buffer.
-            if len(buf) >= size:
-                self._buffer = buf
-                data += self._read_from_buffer(size)
-            else:
-                data += buf
-
-            size -= len(buf)
-
-        self._pos += len(data)
-
-        return data
-
-    def close(self):
-        """Close the '_CompressedFile' file-like object."""
-        pass
-
 def _decode_sshpass_exit_code(code):
     """
     A helper function which converts "sshpass" command-line tool's exit code
@@ -245,20 +143,21 @@ class TransRead(object):
         self.bz2file_found = False
         # Whether the file is behind an URL
         self.is_url = False
-
-        # Wait for this child process in the destructor
-        self._child_process = None
-
+        # List of child processes we forked
+        self._child_processes = []
+        # The reader thread
+        self._rthread = None
+        # This variable becomes 'True' when the instance of this class is not
+        # usable any longer.
+        self._done = False
         # There may be a chain of open files, and we save the intermediate file
         # objects in the 'self._f_objs' list. The final file object is stored
         # in th elast element of the list.
         #
-        # For example, when the path is an URL to a tar.xz file, the chain of
+        # For example, when the path is an URL to a bz2 file, the chain of
         # opened file will be:
         #   o self._f_objs[0] is the liburl2 file-like object
-        #   o self._f_objs[1] is the lzma file-like object
-        #   o self._f_objs[2] is the tarfile file-like object
-        #   o self._f_objs[3] is the tarfilemember file-like object
+        #   o self._f_objs[1] is the stdout of the 'bzip2' process
         self._f_objs = []
 
         self._force_fake_seek = False
@@ -277,25 +176,36 @@ class TransRead(object):
 
     def __del__(self):
         """The class destructor which closes opened files."""
-        for _file_obj in self._f_objs:
-            if _file_obj:
-                _file_obj.close()
+        self._done = True
+
+        for child in self._child_processes:
+            child.kill()
+
+        if self._rthread:
+            self._rthread.join()
+
+        for file_obj in self._f_objs:
+            file_obj.close()
 
-    def _open_tarfile(self):
+    def _read_thread(self, f_from, f_to):
         """
-        This is a helper function for '_open_compressed_file' which is called
-        when the file is a tar archive.
+        This function is used when reading compressed files. It runs in a
+        spearate thread, reads data from the 'f_from' file-like object, and
+        writes them to the 'f_to' file-like object. 'F_from' may be a urllib2
+        object, while 'f_to' is usually stdin of the decompressor process.
         """
 
-        import tarfile
+        chunk_size = 1024*1024
+        while not self._done:
+            buf = f_from.read(chunk_size)
+            if not buf:
+                break
 
-        f_obj = tarfile.open(fileobj=self._f_objs[-1], mode='r|*')
-        self._f_objs.append(f_obj)
+            f_to.write(buf)
 
-        member = self._f_objs[-1].next()
-        self.size = member.size
-        f_obj = self._f_objs[-1].extractfile(member)
-        self._f_objs.append(f_obj)
+        # This will make sure the process decompressor gets EOF and exits, as
+        # well as ublocks processes waiting on decompressor's stdin.
+        f_to.close()
 
     def _open_compressed_file(self):
         """
@@ -304,63 +214,145 @@ class TransRead(object):
         compressed.
         """
 
-        try:
-            if self.name.endswith('.gz') or self.name.endswith('.gzip') \
-                 or self.name.endswith('.tgz'):
-                import zlib
-
-                self.compression_type = 'gzip'
-                decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
-                f_obj = _CompressedFile(self._f_objs[-1],
-                                        decompressor.decompress)
-                self._f_objs.append(f_obj)
-
-                if self.name.endswith('.tar.gz') or self.name.endswith('.tgz'):
-                    self._open_tarfile()
-            elif self.name.endswith('.bz2'):
-                import bz2
-
-                # Let's try to use the bz2file module, which is a backport from
-                # python 3.3 available in PyPI. It supports multiple streams
-                # (pbzip2) and handles handles out-of-memory issues nicely.
-                try:
-                    import bz2file
-
-                    self.bz2file_found = True
-                    f_obj = bz2file.BZ2File(self._f_objs[-1], 'r')
-                except ImportError:
-                    import bz2
-
-                    f_obj = _CompressedFile(self._f_objs[-1],
-                                      bz2.BZ2Decompressor().decompress, 128)
-
-                self.compression_type = 'bzip2'
-                self._f_objs.append(f_obj)
-
-                if self.name.endswith('.tar.bz2'):
-                    self._open_tarfile()
-            elif self.name.endswith('.xz'):
-                try:
-                    import lzma
-                except ImportError:
-                    try:
-                        from backports import lzma # pylint: disable=F0401
-                    except ImportError:
-                        raise Error("cannot import the \"lzma\" python module, "
-                                    "it's required for decompressing .xz files")
-
-                self.compression_type = 'xz'
-                f_obj = _CompressedFile(self._f_objs[-1],
-                                        lzma.LZMADecompressor().decompress, 128)
-                self._f_objs.append(f_obj)
-
-                if self.name.endswith('.tar.xz'):
-                    self._open_tarfile()
+        def is_gzip(name):
+            """Returns 'True' if file 'name' is compressed with 'gzip'."""
+            if name.endswith('.gzip') or \
+               (name.endswith('.gz') and not name.endswith('.tar.gz')):
+                return True
+            return False
+
+        def is_bzip2(name):
+            """Returns 'True' if file 'name' is compressed with 'bzip2'."""
+            if name.endswith('.bz2') and not name.endswith('.tar.bz2'):
+                return True
+            return False
+
+        def is_xz(name):
+            """Returns 'True' if file 'name' is compressed with 'xz'."""
+            if name.endswith('.xz') and not name.endswith('.tar.xz'):
+                return True
+            return False
+
+        def is_lzop(name):
+            """Returns 'True' if file 'name' is compressed with 'lzop'."""
+            if name.endswith('.lzo') and not name.endswith('.tar.lzo'):
+                return True
+            return False
+
+        def is_tar_gz(name):
+            """
+            Returns 'True' if file 'name' is a tar archive compressed with
+            'gzip'.
+            """
+
+            if name.endswith('.tar.gz') or name.endswith('.tgz'):
+                return True
+            return False
+
+        def is_tar_bz2(name):
+            """
+            Returns 'True' if file 'name' is a tar archive compressed with
+            'bzip2'.
+            """
+
+            if name.endswith('.tar.bz2') or name.endswith('.tbz') or \
+               name.endswith('.tbz2') or name.endswith('.tb2'):
+                return True
+            return False
+
+        def is_tar_xz(name):
+            """
+            Returns 'True' if file 'name' is a tar archive compressed with 'xz'.
+            """
+
+            if name.endswith('.tar.xz') or name.endswith('.txz'):
+                return True
+            return False
+
+        def is_tar_lzo(name):
+            """
+            Returns 'True' if file 'name' is a tar archive compressed with
+            'lzop'.
+            """
+
+            if name.endswith('.tar.lzo') or name.endswith('.tzo'):
+                return True
+            return False
+
+        archiver = None
+        if is_tar_gz(self.name) or is_gzip(self.name):
+            self.compression_type = 'gzip'
+            if BmapHelpers.program_is_available("pigz"):
+                decompressor = "pigz"
             else:
-                if not self.is_url:
-                    self.size = os.fstat(self._f_objs[-1].fileno()).st_size
-        except IOError as err:
-            raise Error("cannot open file '%s': %s" % (self.name, err))
+                decompressor = "gzip"
+
+            if is_gzip(self.name):
+                args = "-d -c"
+            else:
+                archiver = "tar"
+                args = "-x -z -O"
+        elif is_tar_bz2(self.name) or is_bzip2(self.name):
+            self.compression_type = 'bzip2'
+            if BmapHelpers.program_is_available("pbzip2"):
+                decompressor = "pbzip2"
+            else:
+                decompressor = "bzip2"
+
+            if is_bzip2(self.name):
+                args = "-d -c"
+            else:
+                archiver = "tar"
+                args = "-x -j -O"
+        elif is_tar_xz(self.name) or is_xz(self.name):
+            self.compression_type = 'xz'
+            decompressor = "xz"
+            if is_xz(self.name):
+                args = "-d -c"
+            else:
+                archiver = "tar"
+                args = "-x -J -O"
+        elif is_tar_lzo(self.name) or is_lzop(self.name):
+            self.compression_type = 'lzo'
+            decompressor = "lzop"
+            if is_lzop(self.name):
+                args = "-d -c"
+            else:
+                archiver = "tar"
+                args = "-x --lzo -O"
+        else:
+            if not self.is_url:
+                self.size = os.fstat(self._f_objs[-1].fileno()).st_size
+            return
+
+        # Make sure decompressor and the archiver programs are available
+        if not BmapHelpers.program_is_available(decompressor):
+            raise Error("the \"%s\" program is not available but it is "
+                        "required decompressing \"%s\""
+                        % (decompressor, self.name))
+        if archiver and not BmapHelpers.program_is_available(archiver):
+            raise Error("the \"%s\" program is not available but it is "
+                        "required reading \"%s\"" % (archiver, self.name))
+
+        # Start the decompressor process. We'll send the data to its stdin and
+        # read the decompressed data from its stdout.
+        if archiver:
+            args = archiver + " " + args
+        else:
+            args = decompressor + " " + args
+        child_process = subprocess.Popen(args, shell=True,
+                                         bufsize=1024*1024,
+                                         stdin=subprocess.PIPE,
+                                         stdout=subprocess.PIPE,
+                                         stderr=subprocess.PIPE)
+
+        args = (self._f_objs[-1], child_process.stdin, )
+        self._rthread = threading.Thread(target=self._read_thread, args=args)
+        self._rthread.start()
+
+        self._force_fake_seek = True
+        self._f_objs.append(child_process.stdout)
+        self._child_processes.append(child_process)
 
     def _open_url_ssh(self, parsed_url):
         """
@@ -427,13 +419,14 @@ class TransRead(object):
                         "permissions" % (path, hostname))
 
         # Read the entire file using 'cat'
-        self._child_process = subprocess.Popen(popen_args + ["cat " + path],
-                                               stdout=subprocess.PIPE)
+        child_process = subprocess.Popen(popen_args + ["cat " + path],
+                                         stdout=subprocess.PIPE)
 
         # Now the contents of the file should be available from sub-processes
         # stdout
-        self._f_objs.append(self._child_process.stdout)
+        self._f_objs.append(child_process.stdout)
 
+        self._child_processes.append(child_process)
         self.is_url = True
         self._force_fake_seek = True
 
@@ -520,22 +513,6 @@ class TransRead(object):
         self.is_url = True
         self._f_objs.append(f_obj)
 
-    def _get_pbzip2_error_string(self):
-        """
-        This is a helper function which returns a string which describes the
-        problem of decompressing multi-stream bzip2 archives.
-        """
-
-        res =  "file \"%res\" is a multi-stream bz2 archive " % self.name
-        res += "(pbzip2) and it is not supported by python 2.x\n"
-        res += "Please, install the \"bz2file\" python library from PyPI to "
-        res += "support multi-stream archives. Here is an example of how this "
-        res += "could be done in Fedora:\n"
-        res += "$ yum install python-pip\n"
-        res += "$ pip install bz2file"
-
-        return res
-
     def read(self, size=-1):
         """
         Read the data from the file or URL and and uncompress it on-the-fly if
@@ -544,33 +521,18 @@ class TransRead(object):
 
         if size < 0:
             size = 0xFFFFFFFFFFFFFFFF
-
-        try:
-            buf = self._f_objs[-1].read(size)
-        except EOFError:
-            if self.compression_type == 'bzip2' and not self.bz2file_found:
-                # The file is probably compressed with 'pbzip2'
-                raise Error(self._get_pbzip2_error_string())
-            else:
-                raise
-
+        buf = self._f_objs[-1].read(size)
         self._pos += len(buf)
+
         return buf
 
     def seek(self, offset, whence=os.SEEK_SET):
         """The 'seek()' method, similar to the one file objects have."""
-        try:
-            if self._force_fake_seek or not hasattr(self._f_objs[-1], "seek"):
-                self._pos = _fake_seek_forward(self._f_objs[-1], self._pos,
-                                               offset, whence)
-            else:
-                self._f_objs[-1].seek(offset, whence)
-        except EOFError:
-            if self.compression_type == 'bzip2' and not self.bz2file_found:
-                # The file is probably compressed with 'pbzip2'
-                raise Error(self._get_pbzip2_error_string())
-            else:
-                raise
+        if self._force_fake_seek or not hasattr(self._f_objs[-1], "seek"):
+            self._pos = _fake_seek_forward(self._f_objs[-1], self._pos,
+                                           offset, whence)
+        else:
+            self._f_objs[-1].seek(offset, whence)
 
     def tell(self):
         """The 'tell()' method, similar to the one file objects have."""
-- 
1.8.3.1




More information about the Bmap-tools mailing list