3

When working whith large data files, it may be desirable to output a sequence of bytes by large chunks. This snippet defines a file adapter class to handle this transparently. Writing bytes to an ChunkedOutputFile will automatically write the underlying file object by fixed length chunks.

Edited by Gribouillis: n/a

Comments
looks useful
#!/usr/bin/env python
# -*-coding: utf8-*-
# Title: writechunks.py
# Author: Gribouillis for the python forum at www.daniweb.com
# Created: 2012-03-18 11:49:50.489168 (isoformat date)
# License: Public Domain
# Use this code freely.

import mmap
import platform
import sys

version_info = (0, 1)
version = ".".join(map(str, version_info))

B = 1
_K = 1024
KB = _K * B
MB = _K * KB
GB = _K * MB

def new_mmap(chunk_size):
    if platform.system() == 'Windows':
        map = mmap.mmap(-1, chunk_size, None, mmap.ACCESS_WRITE)
    else:
        map = mmap.mmap(-1, chunk_size, mmap.MAP_PRIVATE, mmap.PROT_READ | mmap.PROT_WRITE)
    return map

        
def chunked(iterable, chunk_size):
    """Adapter to convert a sequence of strings to another sequence of
    strings with fixed length (the last string may be smaller).
    
    Example:
    
        >>> data = ["give ", "me ", "bacon ", "and ", "eggs ", "said ", "the ", "other ", "man."]
        >>> for s in chunked(data, 9):
        ...     print(s)
        ...
        give me b
        acon and 
        eggs said
         the othe
        r man.

    """
    iterable = iter(iterable)
    map = new_mmap(chunk_size)
    tell, seek, write = map.tell, map.seek, map.write
    while True:
        try:
            for chars in iterable:
                write(chars)
            break
        except ValueError:
            while True:
                sz = chunk_size - tell()
                write(chars[:sz])
                yield map[:]
                seek(0)
                chars = chars[sz:]
                try:
                    write(chars)
                    break
                except ValueError:
                    pass
    if tell():
        yield map[0:tell()]
        seek(0)

class ChunkedOutputFile(object):
    """Adapter class to write an output file by chunks of fixed length.
    
    Typical use:
    
        # This code writes a sequence of strings to myfile.txt by chunks of 32 megabytes.
    
        with open("myfile.txt", "w") as handle:
            with ChunkedOutputFile(handle, 32 * MB) as ofh:
                for string in data_source():
                    ofh.write(string)
                    
    Many file methods are left unimplemented in this class. They could be implemented in subclasses.
        
    """
    def __init__(self, ofh, chunk_size):
        self.ofh = ofh
        self.chunk_size = chunk_size
        self.map = new_mmap(chunk_size)
        
    def close(self):
        self.flush()
        self.map.close()
        self.ofh.close()

    def fileno(self):
        return self.ofh.fileno()
        
    def isatty(self):
        return self.ofh.isatty()

    def flush(self):
        if self.map.tell():
            self.ofh.write(self.map[:self.map.tell()])
            self.map.seek(0)
        self.ofh.flush()
    
    def next(self):
        raise NotImplementedError
        
    def read(self, size=-1):
        raise NotImplementedError
    
    def readline(self, size=-1):
        raise NotImplementedError
        
    def readlines(self, sizehint=None):
        raise NotImplementedError
        
    def seek(self, offset, whence = 0):
        if self.map.tell():
            self.ofh.write(self.map[:self.map.tell()])
            self.map.seek(0)
        self.ofh.seek(offset, whence)
    
    def tell(self):
        return self.ofh.tell() + self.map.tell()
    
    def truncate(self, size):
        raise NotImplementedError
        
    def write(self, chars):
        while chars:
            sz = self.chunk_size - self.map.tell()
            try:
                if not sz:
                    raise ValueError
                self.map.write(chars[:sz])
                chars = chars[sz:]
            except ValueError:
                self.ofh.write(self.map[:])
                self.map.seek(0)
    
    def writelines(self, sequence):
        iterable = iter(sequence)
        map = self.map
        chunk_size = self.chunk_size
        tell, seek, write = map.tell, map.seek, map.write
        while True:
            try:
                for chars in iterable:
                    write(chars)
                break
            except ValueError:
                while True:
                    sz = chunk_size - tell()
                    write(chars[:sz])
                    self.ofh.write(map[:])
                    seek(0)
                    chars = chars[sz:]
                    try:
                        write(chars)
                        break
                    except ValueError:
                        pass
        if tell():
            self.ofh.write(map[0:tell()])
            seek(0)
            
    @property
    def closed(self):
        return self.ofh.closed
        
    @property
    def encoding(self):
        raise NotImplementedError
        
    @property
    def errors(self):
        raise NotImplementedError
    
    @property
    def mode(self):
        return self.ofh.mode
        
    @property
    def name(self):
        return self.ofh.name
        
    @property
    def newlines(self):
        return self.ofh.newlines
        
    def _get_softspace(self):
        return self.ofh.softspace
    def _set_softspace(self, value):
        self.ofh.softspace = value
    softspace = property(_get_softspace, _set_softspace)

    def __enter__(self):
        return self
        
    def __exit__(self, *args):
        self.close()

if __name__ == "__main__":
    # test code
    test_str = open(__file__).read()
    data = test_str.split()
    L = list()
    chunk = 9
    for c in chunked(data, chunk):
        L.append(c)
        
    def check_list(alist, chunk_size, expected):
        assert all(len(x) == chunk_size for x in alist[:-1])
        assert ''.join(alist) == expected
        
    check_list(L, chunk, ''.join(data))

    class _MockFile(list):
        def write(self, chars):
            self.append(chars)

        def close(self):
            pass
            
        def flush(self):
            pass

    chunk = 3*KB/2
    
    with open(__file__) as ifh:
        with ChunkedOutputFile(_MockFile(), chunk) as ofh:
            for line in ifh:
                ofh.write(line)
        check_list(ofh.ofh, chunk, test_str)
        ifh.seek(0)
        with ChunkedOutputFile(_MockFile(), chunk) as ofh:
            ofh.writelines(ifh)
        check_list(ofh.ofh, chunk, test_str)
2
Contributors
4
Replies
20
Views
4 Years
Discussion Span
Last Post by Gribouillis
0

I missed an optimizing trick in write(). Here is the improved version

#!/usr/bin/env python
# -*-coding: utf8-*-
# Title: writechunks.py
# Author: Gribouillis for the python forum at www.daniweb.com
# Created: 2012-03-18 11:49:50.489168 (isoformat date)
# License: Public Domain
# Use this code freely.

import mmap
import platform
import sys

version_info = (0, 2)
version = ".".join(map(str, version_info))

B = 1
_K = 1024
KB = _K * B
MB = _K * KB
GB = _K * MB

if platform.system() == 'Windows':
    def new_mmap(chunk_size):
        return mmap.mmap(-1, chunk_size, None, mmap.ACCESS_WRITE)
else:
    def new_mmap(chunk_size):
        return mmap.mmap(-1, chunk_size, mmap.MAP_PRIVATE, mmap.PROT_READ | mmap.PROT_WRITE)
        
def chunked(iterable, chunk_size):
    """Adapter to convert a sequence of strings to another sequence of
    strings with fixed length (the last string may be smaller).
    
    Example:
    
        >>> data = ["give ", "me ", "bacon ", "and ", "eggs ", "said ", "the ", "other ", "man."]
        >>> for s in chunked(data, 9):
        ...     print(s)
        ...
        give me b
        acon and 
        eggs said
         the othe
        r man.

    """
    iterable = iter(iterable)
    map = new_mmap(chunk_size)
    tell, seek, write = map.tell, map.seek, map.write
    while True:
        try:
            for chars in iterable:
                write(chars)
            break
        except ValueError:
            while True:
                sz = chunk_size - tell()
                write(chars[:sz])
                yield map[:]
                seek(0)
                chars = chars[sz:]
                try:
                    write(chars)
                    break
                except ValueError:
                    pass
    if tell():
        yield map[0:tell()]
        seek(0)

class ChunkedOutputFile(object):
    """Adapter class to write an output file by chunks of fixed length.
    
    Typical use:
    
        # This code writes a sequence of strings to myfile.txt by chunks of 32 megabytes.
    
        with open("myfile.txt", "w") as handle:
            with ChunkedOutputFile(handle, 32 * MB) as ofh:
                for string in data_source():
                    ofh.write(string)
                    
    Many file methods are left unimplemented in this class. They could be implemented in subclasses.
        
    """
    def __init__(self, ofh, chunk_size):
        self.ofh = ofh
        self.chunk_size = chunk_size
        self.map = new_mmap(chunk_size)
        
    def close(self):
        self.flush()
        self.map.close()
        self.ofh.close()

    def fileno(self):
        return self.ofh.fileno()
        
    def isatty(self):
        return self.ofh.isatty()

    def flush(self):
        if self.map.tell():
            self.ofh.write(self.map[:self.map.tell()])
            self.map.seek(0)
        self.ofh.flush()
    
    def next(self):
        raise NotImplementedError
        
    def read(self, size=-1):
        raise NotImplementedError
    
    def readline(self, size=-1):
        raise NotImplementedError
        
    def readlines(self, sizehint=None):
        raise NotImplementedError
        
    def seek(self, offset, whence = 0):
        if self.map.tell():
            self.ofh.write(self.map[:self.map.tell()])
            self.map.seek(0)
        self.ofh.seek(offset, whence)
    
    def tell(self):
        return self.ofh.tell() + self.map.tell()
    
    def truncate(self, size):
        raise NotImplementedError
        
    def write(self, chars):
        while chars:
            try:
                self.map.write(chars)
                return
            except ValueError:
                sz = self.chunk_size - self.map.tell()
                if sz:
                    self.map.write(chars[:sz])
                    chars = chars[sz:]
                self.ofh.write(self.map[:])
                self.map.seek(0)
    
    def writelines(self, sequence):
        iterable = iter(sequence)
        map = self.map
        chunk_size = self.chunk_size
        tell, seek, write = map.tell, map.seek, map.write
        while True:
            try:
                for chars in iterable:
                    write(chars)
                break
            except ValueError:
                while True:
                    sz = chunk_size - tell()
                    write(chars[:sz])
                    self.ofh.write(map[:])
                    seek(0)
                    chars = chars[sz:]
                    try:
                        write(chars)
                        break
                    except ValueError:
                        pass
        if tell():
            self.ofh.write(map[0:tell()])
            seek(0)
            
    @property
    def closed(self):
        return self.ofh.closed
        
    @property
    def encoding(self):
        raise NotImplementedError
        
    @property
    def errors(self):
        raise NotImplementedError
    
    @property
    def mode(self):
        return self.ofh.mode
        
    @property
    def name(self):
        return self.ofh.name
        
    @property
    def newlines(self):
        return self.ofh.newlines
        
    def _get_softspace(self):
        return self.ofh.softspace
    def _set_softspace(self, value):
        self.ofh.softspace = value
    softspace = property(_get_softspace, _set_softspace)

    def __enter__(self):
        return self
        
    def __exit__(self, *args):
        self.close()

if __name__ == "__main__":
    # test code
    test_str = open(__file__).read()
    data = test_str.split()
    L = list()
    chunk = 9
    for c in chunked(data, chunk):
        L.append(c)
        
    def check_list(alist, chunk_size, expected):
        assert all(len(x) == chunk_size for x in alist[:-1])
        assert ''.join(alist) == expected
        
    check_list(L, chunk, ''.join(data))

    class _MockFile(list):
        def write(self, chars):
            self.append(chars)

        def close(self):
            pass
            
        def flush(self):
            pass

    chunk = 3*KB/2
    
    with open(__file__) as ifh:
        with ChunkedOutputFile(_MockFile(), chunk) as ofh:
            for line in ifh:
                ofh.write(line)
        check_list(ofh.ofh, chunk, test_str)
        ifh.seek(0)
        with ChunkedOutputFile(_MockFile(), chunk) as ofh:
            ofh.writelines(ifh)
        check_list(ofh.ofh, chunk, test_str)

Edited by Gribouillis: n/a

0

This snippet could probably be good foundation for file copy/installation with progress bar by adding function to call for each block. It could take parameters for current block number and total number of blocks.

0

What is the benefit of using this snippet compared to io module's buffered io?

You are right, there may be no advantage at all. I didn't think of that when I wrote the snippet.

Have something to contribute to this discussion? Please be thoughtful, detailed and courteous, and be sure to adhere to our posting rules.