Adapt an iterable to look like a file.

Gribouillis 2 Tallied Votes 1K Views Share

Any sequence of python strings can be disguised to look like a file opened for reading, which can then be passed on the fly to functions expecting such an input file. This code snippet defines the necessary adapter class. The obtained file-like objects store only one string of the sequence at a time (which limits some file operations, seek() doesn't work backwards for example). This code was inspired by the standard library's StringIO module. Please note that when memory or disk access is not an issue, it may be more efficient to write a temporary file than to use this adapter class (for example in linux one could write a file in /dev/shm).

# Name: module adaptstrings.py
# Date: october 29 2011
# Author: Gribouillis for the python forum at www.daniweb.com
# License: public domain
# Use this code freely in your programs

"""Module to adapt sequences of strings to a file-like interface.
"""

__all__ = ["adapt_as_file", "adapt_as_opener"]

def adapt_as_file(iterable):
    """Adapt the iterable argument to look like a file object opened for
    reading. It is assumed that the iterable's items are strings.

    Return a file-like object with methods close(), read(), readline(), readlines()
    seek(), tell(). See the documentation of the class IterableAsFileAdapter for
    complete documentation.
    
    typical usage:
        
        f = adapt_as_file(sequence_of_strings)
        for line in f:
            ...
            # use f as if it were a file opened for reading
    """
    return IterableAsFileAdapter(iterable)

def adapt_as_opener(func):
    """Decorator to adapt a string generator function as a function which
    return a file-like object opened for reading. See the documentation of
    adapt_as_file() for more.
    
    typical usage:
        
        @adapt_as_opener
        def data_source(args):
            yield a_string
            yield another_string
            
        f = data_source(args)
        for line in f:
            ...
            # use f as if it were a file opened for reading
    """
    def wrapper(*args, **kwd):
        return adapt_as_file(func(*args, **kwd))
    update_wrapper(wrapper, func)
    return wrapper


def _complain_ifclosed(closed):
    if closed:
        raise ValueError, "I/O operation on closed file"

# This class implementation was inspired by the pure python implementation
# of the StringIO class in the python standard library

class IterableAsFileAdapter(object):
    def __init__(self, iterable):
        self.source = iter(iterable)
        self.start = self.pos = self.end = 0
        self.buf = ''
        self._closed = False
        
    def is_sane(self):
        """return a bool indicating that the file-like instance satisfies the
        class invariants defined in its implementation. A False value
        should not happen, it would mean an internal implementation error.
        """
        return bool(self._closed or (
            (self.start <= self.pos <= self.end)
            and (self.start + len(self.buf) == self.end)
            and ((self.source is not None) or self.buf == '')
            ))

    def __iter__(self):
        return self

    def next(self):
        """A file object is its own iterator, for example iter(f) returns f
        (unless f is closed). When a file is used as an iterator, typically
        in a for loop (for example, for line in f: print line), the next()
        method is called repeatedly. This method returns the next input line,
        or raises StopIteration when EOF is hit.
        """
        r = self.readline()
        if not r:
            raise StopIteration
        return r

    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.close()
        
    @property
    def closed(self):
        """bool indicating the current state of the file-like object. This is a
        read-only attribute; the close() method changes the value."""
        return self._closed
    
    def close(self):
        """Free the memory buffer.
        """
        if not self._closed:
            self._closed, self.buf, self.source = True, '', None
            self.start = self.pos = self.end = 0

    def isatty(self):
        """Returns False.
        """
        _complain_ifclosed(self._closed)
        return False
    
    def _go_next(self):
        """Advance the file position to the beginning of the next string
        in the iterable source. For internal use only.
        """
        self.start = self.pos = self.end
        self.buf = ''
        if self.source is not None:
            try:
                self.buf = next(self.source)
                self.end += len(self.buf)
            except StopIteration:
                self.source = None
                
    def seek(self, where, mode = 0):
        """Set the file's current position.

        The mode argument is optional and defaults to 0 (absolute file
        positioning); other values are 1 (seek relative to the current
        position) and 2 (seek relative to the file's end). This adapter
        class only support seek to positions beyond the current file position
        and does not support mode 2. A ValueError is raised when these
        conditions fail.

        There is no return value.
        """
        _complain_ifclosed(self._closed)
        if mode == 2:
            raise ValueError("File-like object does not support seek() from eof.")
        elif mode == 1:
            where += self.pos # use absolute positioning
        if where < self.pos:
            raise ValueError("File-like object does not support backward seek().")
        while where > self.end:
            self._go_next()
            if self.source is None:
                return
        self.pos = where

    def tell(self):
        """Return the file's current position."""
        _complain_ifclosed(self._closed)
        return self.pos

    def read(self, n = -1):
        """Read at most size bytes from the file
        (less if the read hits EOF before obtaining size bytes).

        If the size argument is negative or omitted, read all data until EOF
        is reached. The bytes are returned as a string object. An empty
        string is returned when EOF is encountered immediately.
        """
        _complain_ifclosed(self._closed)
        L = list()
        if n < 0:
            L.append(self.buf[self.pos-self.start:])
            while self.source is not None:
                self._go_next()
                L.append(self.buf)
        else:
            where = self.pos + n
            while where > self.end:
                L.append(self.buf[self.pos-self.start:])
                self._go_next()
                if self.source is None:
                    break
            else:
                L.append(self.buf[self.pos-self.start:where-self.start])
                self.pos = where
        return ''.join(L)

    def readline(self, length = None):
        r"""Read one entire line from the file.

        A trailing newline character is kept in the string (but may be absent
        when a file ends with an incomplete line). If the size argument is
        present and non-negative, it is a maximum byte count (including the
        trailing newline) and an incomplete line may be returned.

        An empty string is returned only when EOF is encountered immediately.

        Note: Unlike stdio's fgets(), the returned string contains null
        characters ('\0') if they occurred in the input.
        """
        _complain_ifclosed(self._closed)
        if length is not None and length > 0:
            stop = self.pos + length
        else:
            stop = -1
        L = list()
        while True:
            where = self.buf.find('\n', self.pos-self.start)
            if where < 0: # newline not found in buffer
                if self.pos <= stop <= self.end:
                    L.append(self.buf[self.pos-self.start:stop-self.start])
                    self.pos = stop
                    break
                else:
                    L.append(self.buf[self.pos-self.start:])
                    self._go_next()
                    if self.source is None:
                        break
            else:
                where += 1 + self.start
                if 0 <= stop < where:
                    where = stop
                L.append(self.buf[self.pos-self.start:where-self.start])
                self.pos = where
                break
        return ''.join(L)

    def readlines(self, sizehint = 0):
        """Read until EOF using readline() and return a list containing the
        lines thus read.

        If the optional sizehint argument is present, instead of reading up
        to EOF, whole lines totalling approximately sizehint bytes (or more
        to accommodate a final whole line).
        """
        total = 0
        lines = []
        line = self.readline()
        while line:
            lines.append(line)
            total += len(line)
            if 0 < sizehint <= total:
                break
            line = self.readline()
        return lines
    
    def flush(self):
        """This is a no-op for this file-like object.
        """
        _complain_ifclosed(self._closed)
        
    @property
    def name(self):
        return "<%s>" % self.__class__.__name__

# test code

def test():
    import sys
    if sys.argv[1:]:
        file = sys.argv[1]
    else:
        file = '/etc/passwd'
    text = open(file, 'r').read(100000)
    if not len(text):
        text = "x"*70 + "\n"
    while len(text) < 10000:
        text += text
    assert 10000 <= len(text) < 100000

    def sample_iterable():
        from random import Random
        ra = Random()
        ra.seed(12345678901234567890)
        i = 0
        while i < len(text):
            d = ra.randint(10, 120)
            yield text[i: i+d]
            i += d
    def new():
        return IterableAsFileAdapter(sample_iterable())

    assert new().read() == text

    f = new()
    x = list()
    while True:
        x.append(f.read(31))
        assert f.is_sane()
        if len(x[-1]) != 31:
            assert f.tell() == len(text)
            break
    assert f.tell() == len(text)
    assert ''.join(x) == text

    f = new()
    while True:
        p = f.tell()
        f.seek(47, 1)
        assert f.is_sane()
        if p + 47 < len(text):
            assert f.tell() == p + 47
        else:
            assert f.tell() == len(text)
            break

    f = new()
    while True:
        p = f.tell()
        f.seek(p+47)
        assert f.is_sane()
        if p + 47 < len(text):
            assert f.tell() == p + 47
        else:
            assert f.tell() == len(text)
            break
        
    f = new()
    U, V = f.readlines(), text.split('\n')
    if not V[-1]:
        del V[-1]
    assert len(U) == len(V)
    assert all(x == y + '\n' for x, y in zip(U, V))
    
    with new() as f:
        for line in f:
            pass

if __name__ == "__main__":
    test()
Gribouillis 1,391 Programming Explorer Team Colleague

Bugfix release with small improvements, compatible with python 2 and 3:

# Name: module adaptstrings.py
# Date: october 29 2011
# Author: Gribouillis for the python forum at www.daniweb.com
# License: public domain
# Use this code freely in your programs

"""Module to adapt sequences of strings to a file-like interface.
"""
from functools import update_wrapper
import sys
if sys.version_info < (3,):
    from itertools import ifilter as filter
    
version_info = (1, 0)

__all__ = ["adapt_as_file", "adapt_as_opener"]

def adapt_as_file(iterable):
    """Adapt the iterable argument to look like a file object opened for
    reading. It is assumed that the iterable's items are strings.

    Return a file-like object with methods close(), read(), readline(), readlines()
    seek(), tell(). See the documentation of the class IterableAsFileAdapter for
    complete documentation.
    
    typical usage:
        
        f = adapt_as_file(sequence_of_strings)
        for line in f:
            ...
            # use f as if it was a file opened for reading
    """
    return IterableAsFileAdapter(iterable)

def adapt_as_opener(func):
    """Decorator to adapt a string generator function as a function which
    return a file-like object opened for reading. See the documentation of
    adapt_as_file() for more.
    
    typical usage:
        
        @adapt_as_opener
        def data_source(args):
            yield a_string
            yield another_string
            
        f = data_source(args)
        for line in f:
            ...
            # use f as if it was a file opened for reading
    """
    def wrapper(*args, **kwd):
        return adapt_as_file(func(*args, **kwd))
    update_wrapper(wrapper, func)
    return wrapper


def _complain_ifclosed(closed):
    if closed:
        raise ValueError("I/O operation on closed file")
    
# This class implementation was inspired by the pure python implementation
# of the StringIO class in the python standard library

class IterableAsFileAdapter(object):
    def __init__(self, iterable):
        self.source = filter(None, iterable)
        self.start = self.pos = self.end = 0
        self.buf = ''
        self._closed = False
        
    def is_sane(self):
        """return a bool indicating that the file-like instance satisfies the
        class invariants defined in its implementation. A False value
        should not happen, it would mean an internal implementation error.
        """
        return bool(self._closed or (
            (self.start <= self.pos <= self.end)
            and (self.start + len(self.buf) == self.end)
            and ((self.source is not None) or self.buf == '')
            ))

    def __iter__(self):
        return self

    def next(self):
        """A file object is its own iterator, for example iter(f) returns f
        (unless f is closed). When a file is used as an iterator, typically
        in a for loop (for example, for line in f: print line), the next()
        method is called repeatedly. This method returns the next input line,
        or raises StopIteration when EOF is hit.
        """
        r = self.readline()
        if not r:
            raise StopIteration
        return r
    __next__ = next

    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.close()
        
    @property
    def closed(self):
        """bool indicating the current state of the file-like object. This is a
        read-only attribute; the close() method changes the value."""
        return self._closed
    
    def close(self):
        """Free the memory buffer.
        """
        if not self._closed:
            self._closed, self.buf, self.source = True, '', None
            self.start = self.pos = self.end = 0

    def isatty(self):
        """Returns False.
        """
        _complain_ifclosed(self._closed)
        return False
    
    def _go_next(self):
        """Advance the file position to the beginning of the next string
        in the iterable source. For internal use only.
        """
        self.start = self.pos = self.end
        try:
            self.buf = next(self.source)
        except (TypeError, StopIteration):
            self.source, self.buf = None, ''
        else:
            self.end += len(self.buf)
                
    def seek(self, where, mode = 0):
        """Set the file's current position.

        The mode argument is optional and defaults to 0 (absolute file
        positioning); other values are 1 (seek relative to the current
        position) and 2 (seek relative to the file's end). This adapter
        class only support seek to positions beyond the current file position
        and does not support mode 2. A ValueError is raised when these
        conditions fail.

        There is no return value.
        """
        _complain_ifclosed(self._closed)
        if mode == 2:
            raise ValueError("File-like object does not support seek() from eof.")
        elif mode == 1:
            where += self.pos # use absolute positioning
        if where < self.pos:
            raise ValueError("File-like object does not support backward seek().")
        while where > self.end:
            self._go_next()
            if self.source is None:
                return
        self.pos = where

    def tell(self):
        """Return the file's current position."""
        _complain_ifclosed(self._closed)
        return self.pos

    def read(self, size = -1):
        """Read at most size bytes from the file
        (less if the read hits EOF before obtaining size bytes).

        If the size argument is negative or omitted, read all data until EOF
        is reached. The bytes are returned as a string object. An empty
        string is returned when EOF is encountered immediately.
        """
        _complain_ifclosed(self._closed)
        if self.source is None:
            return ''
        L = list()
        if size < 0:
            L.append(self.buf[self.pos-self.start:])
            L.extend(self.source)
            self.source, self.buf = None, ''
            s = ''.join(L)
            self.start = self.pos = self.end = self.pos + len(s)
            return s
        else:
            where = self.pos + size
            while where > self.end:
                L.append(self.buf[self.pos-self.start:])
                self._go_next()
                if self.source is None:
                    break
            else:
                L.append(self.buf[self.pos-self.start:where-self.start])
                self.pos = where
            return ''.join(L)

    def readline(self, length = None):
        r"""Read one entire line from the file.

        A trailing newline character is kept in the string (but may be absent
        when a file ends with an incomplete line). If the size argument is
        present and non-negative, it is a maximum byte count (including the
        trailing newline) and an incomplete line may be returned.

        An empty string is returned only when EOF is encountered immediately.

        Note: Unlike stdio's fgets(), the returned string contains null
        characters ('\0') if they occurred in the input.
        """
        _complain_ifclosed(self._closed)
        if length is not None and length > 0:
            stop = self.pos + length
        else:
            stop = -1
        L = list()
        while True:
            where = self.buf.find('\n', self.pos-self.start)
            if where < 0: # newline not found in buffer
                if self.pos <= stop <= self.end:
                    L.append(self.buf[self.pos-self.start:stop-self.start])
                    self.pos = stop
                    break
                else:
                    L.append(self.buf[self.pos-self.start:])
                    self._go_next()
                    if self.source is None:
                        break
            else:
                where += 1 + self.start
                if 0 <= stop < where:
                    where = stop
                L.append(self.buf[self.pos-self.start:where-self.start])
                self.pos = where
                break
        return ''.join(L)

    def readlines(self, sizehint = 0):
        """Read until EOF using readline() and return a list containing the
        lines thus read.

        If the optional sizehint argument is present, instead of reading up
        to EOF, whole lines totalling approximately sizehint bytes (or more
        to accommodate a final whole line).
        """
        total = 0
        lines = []
        line = self.readline()
        while line:
            lines.append(line)
            total += len(line)
            if 0 < sizehint <= total:
                break
            line = self.readline()
        return lines
    
    def flush(self):
        """This is a no-op for this file-like object.
        """
        _complain_ifclosed(self._closed)
        
    @property
    def name(self):
        return "<%s>" % self.__class__.__name__

# test code

def test():
    import sys
    if sys.argv[1:]:
        file = sys.argv[1]
    else:
        file = '/etc/passwd'
    text = open(file, 'r').read(100000)
    if not len(text):
        text = "x"*70 + "\n"
    while len(text) < 10000:
        text += text
    assert 10000 <= len(text) <= 100000

    def sample_iterable():
        from random import Random
        ra = Random()
        ra.seed(12345678901234567890)
        i = 0
        while i < len(text):
            d = ra.randint(10, 120)
            yield text[i: i+d]
            i += d
    def new():
        return IterableAsFileAdapter(sample_iterable())

    assert new().read() == text

    f = new()
    x = list()
    while True:
        x.append(f.read(31))
        assert f.is_sane()
        if len(x[-1]) != 31:
            assert f.tell() == len(text)
            break
    assert f.tell() == len(text)
    assert ''.join(x) == text

    f = new()
    while True:
        p = f.tell()
        f.seek(47, 1)
        assert f.is_sane()
        if p + 47 < len(text):
            assert f.tell() == p + 47
        else:
            assert f.tell() == len(text)
            break

    f = new()
    while True:
        p = f.tell()
        f.seek(p+47)
        assert f.is_sane()
        if p + 47 < len(text):
            assert f.tell() == p + 47
        else:
            assert f.tell() == len(text)
            break
        
    f = new()
    U, V = f.readlines(), text.split('\n')
    if not V[-1]:
        del V[-1]
    if not U[-1].endswith('\n'):
        U[-1] += '\n'
    assert len(U) == len(V)
    assert all(x == y + '\n' for x, y in zip(U, V))
    
    with new() as f:
        for line in f:
            pass

if __name__ == "__main__":
    test()
Gribouillis 1,391 Programming Explorer Team Colleague

A new release, with a redesigned and better readline() method (there was a violation of contract in the previous version when a size argument was passed).

# Name: module adaptstrings.py
# Date: october 29 2011
# Author: Gribouillis for the python forum at www.daniweb.com
# License: public domain
# Use this code freely in your programs

"""Module to adapt sequences of strings to a file-like interface.
"""
from functools import update_wrapper
import sys
if sys.version_info < (3,):
    from itertools import ifilter as filter
    
version_info = (1, 1)

__all__ = ["adapt_as_file", "adapt_as_opener"]

def adapt_as_file(iterable):
    """Adapt the iterable argument to look like a file object opened for
    reading. It is assumed that the iterable's items are strings.

    Return a file-like object with methods close(), read(), readline(), readlines()
    seek(), tell(). See the documentation of the class IterableAsFileAdapter for
    complete documentation.
    
    typical usage:
        
        f = adapt_as_file(sequence_of_strings)
        for line in f:
            ...
            # use f as if it was a file opened for reading
    """
    return IterableAsFileAdapter(iterable)

def adapt_as_opener(func):
    """Decorator to adapt a string generator function as a function which
    return a file-like object opened for reading. See the documentation of
    adapt_as_file() for more.
    
    typical usage:
        
        @adapt_as_opener
        def data_source(args):
            yield a_string
            yield another_string
            
        f = data_source(args)
        for line in f:
            ...
            # use f as if it was a file opened for reading
    """
    def wrapper(*args, **kwd):
        return adapt_as_file(func(*args, **kwd))
    update_wrapper(wrapper, func)
    return wrapper


def _complain_ifclosed(closed):
    if closed:
        raise ValueError("I/O operation on closed file")
    
# This class implementation was inspired by the pure python implementation
# of the StringIO class in the python standard library

class IterableAsFileAdapter(object):
    def __init__(self, iterable):
        self.source = filter(None, iterable)
        self.start = self.pos = self.end = 0
        self.buf = ''
        self._closed = False
        
    def is_sane(self):
        """Return a bool indicating that the file-like instance satisfies the
        class invariants defined in its implementation. A False value
        should not happen, it would mean an internal implementation error.
        """
        return bool(self._closed or (
            (self.start <= self.pos <= self.end)
            and (self.start + len(self.buf) == self.end)
            and ((self.source is not None) or self.buf == '')
            ))

    def __iter__(self):
        return self

    def next(self):
        """A file object is its own iterator, for example iter(f) returns f
        (unless f is closed). When a file is used as an iterator, typically
        in a for loop (for example, for line in f: print line), the next()
        method is called repeatedly. This method returns the next input line,
        or raises StopIteration when EOF is hit.
        """
        r = self.readline()
        if not r:
            raise StopIteration
        return r
    __next__ = next

    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.close()
        
    @property
    def closed(self):
        """Bool indicating the current state of the file-like object. This is a
        read-only attribute; the close() method changes the value."""
        return self._closed
    
    def close(self):
        """Free the memory buffer.
        """
        if not self._closed:
            self._closed, self.buf, self.source = True, '', None
            self.start = self.pos = self.end = 0

    def isatty(self):
        """Return False.
        """
        _complain_ifclosed(self._closed)
        return False
    
    def _go_next(self):
        """Advance the file position to the beginning of the next string
        in the iterable source. For internal use only.
        """
        self.start = self.pos = self.end
        try:
            self.buf = next(self.source)
        except StopIteration:
            self.source, self.buf = None, ''
        except TypeError:
            if self.source is not None:
                raise
        else:
            self.end += len(self.buf)
                
    def seek(self, where, mode = 0):
        """Set the file's current position.

        The mode argument is optional and defaults to 0 (absolute file
        positioning); other values are 1 (seek relative to the current
        position) and 2 (seek relative to the file's end). This adapter
        class only supports seek to positions beyond the current file position
        and does not support mode 2. A ValueError is raised when these
        conditions fail.

        There is no return value.
        """
        _complain_ifclosed(self._closed)
        if mode == 2:
            raise ValueError("File-like object does not support seek() from eof.")
        elif mode == 1:
            where += self.pos # use absolute positioning
        if where < self.pos:
            raise ValueError("File-like object does not support backward seek().")
        while where > self.end:
            self._go_next()
            if self.source is None:
                return
        self.pos = where

    def tell(self):
        """Return the file's current position."""
        _complain_ifclosed(self._closed)
        return self.pos

    def read(self, size = -1):
        """Read at most size bytes from the file
        (less if the read hits EOF before obtaining size bytes).

        If the size argument is negative or omitted, read all data until EOF
        is reached. The bytes are returned as a string object. An empty
        string is returned when EOF is encountered immediately.
        """
        _complain_ifclosed(self._closed)
        if self.source is None:
            return ''
        L = list()
        if size < 0:
            L.append(self.buf[self.pos-self.start:])
            L.extend(self.source)
            self.source, self.buf = None, ''
            s = ''.join(L)
            self.start = self.pos = self.end = self.pos + len(s)
            return s
        else:
            where = self.pos + size
            while where > self.end:
                L.append(self.buf[self.pos-self.start:])
                self._go_next()
                if self.source is None:
                    break
            else:
                L.append(self.buf[self.pos-self.start:where-self.start])
                self.pos = where
            return ''.join(L)

    def readline(self, size = None):
        r"""Read one entire line from the file.

        A trailing newline character is kept in the string (but may be absent
        when a file ends with an incomplete line). If the size argument is
        present and non-negative, it is a maximum byte count (including the
        trailing newline) and an incomplete line may be returned.

        An empty string is returned only when EOF is encountered immediately.

        Note: Unlike stdio's fgets(), the returned string contains null
        characters ('\0') if they occurred in the input.
        """
        _complain_ifclosed(self._closed)
        stop = -1 if (size is None or size < 0) else (self.pos + size)
        L = list()
        while not (0 <= stop <= self.end):
            newline = self.buf.find('\n', self.pos - self.start)
            if newline >= 0:
                stop = self.start + newline + 1
                break
            else:
                L.append(self.buf[self.pos - self.start:])
                self._go_next()
                if self.source is None:
                    return ''.join(L)
        else:
            newline = self.buf.find('\n', self.pos - self.start,
                                            max(0, stop - self.start - 1))
            if newline >= 0:
                stop = self.start + newline + 1
        L.append(self.buf[self.pos - self.start: stop - self.start])
        self.pos = stop
        return ''.join(L)

    def readlines(self, sizehint = 0):
        """Read until EOF using readline() and return a list containing the
        lines thus read.

        If the optional sizehint argument is present, instead of reading up
        to EOF, whole lines totalling approximately sizehint bytes (or more
        to accommodate a final whole line).
        """
        total = 0
        lines = []
        line = self.readline()
        while line:
            lines.append(line)
            total += len(line)
            if 0 < sizehint <= total:
                break
            line = self.readline()
        return lines
    
    def flush(self):
        """This is a no-op for this file-like object.
        """
        _complain_ifclosed(self._closed)
        
    @property
    def name(self):
        return "<%s>" % self.__class__.__name__

# test code

def test():
    import sys
    if sys.argv[1:]:
        file = sys.argv[1]
    else:
        file = '/etc/passwd'
    text = open(file, 'r').read(100000)
    if not len(text):
        text = "x"*70 + "\n"
    while len(text) < 10000:
        text += text
    assert 10000 <= len(text) <= 100000

    def sample_iterable():
        from random import Random
        ra = Random()
        ra.seed(12345678901234567890)
        i = 0
        while i < len(text):
            d = ra.randint(10, 120)
            yield text[i: i+d]
            i += d
    def new():
        return IterableAsFileAdapter(sample_iterable())

    assert new().read() == text

    f = new()
    x = list()
    while True:
        x.append(f.read(31))
        assert f.is_sane()
        if len(x[-1]) != 31:
            assert f.tell() == len(text)
            break
    assert f.tell() == len(text)
    assert ''.join(x) == text

    f = new()
    while True:
        p = f.tell()
        f.seek(47, 1)
        assert f.is_sane()
        if p + 47 < len(text):
            assert f.tell() == p + 47
        else:
            assert f.tell() == len(text)
            break

    f = new()
    while True:
        p = f.tell()
        f.seek(p+47)
        assert f.is_sane()
        if p + 47 < len(text):
            assert f.tell() == p + 47
        else:
            assert f.tell() == len(text)
            break
        
    f = new()
    U, V = f.readlines(), text.split('\n')
    if not V[-1]:
        del V[-1]
    if not U[-1].endswith('\n'):
        U[-1] += '\n'
    assert len(U) == len(V)
    assert all(x == y + '\n' for x, y in zip(U, V))
    
    with new() as f:
        for line in f:
            pass

if __name__ == "__main__":
    test()
Gribouillis 1,391 Programming Explorer Team Colleague

Improved interface, bugfix, tests added. Hopefully the last release :)

# Name: module adaptstrings.py
# Date: october 29 2011
# Author: Gribouillis for the python forum at www.daniweb.com
# License: public domain
# Use this code freely in your programs

"""Module to adapt sequences of strings to a file-like interface.
"""
from functools import update_wrapper
import sys
if sys.version_info < (3,):
    from itertools import ifilter as filter
    
version_info = (1, 2)

__all__ = ["adapt_as_file", "adapt_as_opener"]

def adapt_as_file(iterable):
    """Adapt the iterable argument to look like a file object opened for
    reading. It is assumed that the iterable's items are strings.

    Return a file-like object with methods close(), read(), readline(), readlines()
    seek(), tell(). See the documentation of the class IterableAsFileAdapter for
    complete documentation.
    
    typical usage:
        
        f = adapt_as_file(sequence_of_strings)
        for line in f:
            ...
            # use f as if it was a file opened for reading
    """
    return IterableAsFileAdapter(iterable)

def adapt_as_opener(func):
    """Decorator to adapt a string generator function as a function which
    return a file-like object opened for reading. See the documentation of
    adapt_as_file() for more.
    
    typical usage:
        
        @adapt_as_opener
        def data_source(args):
            yield a_string
            yield another_string
            
        f = data_source(args)
        for line in f:
            ...
            # use f as if it was a file opened for reading
    """
    def wrapper(*args, **kwd):
        return adapt_as_file(func(*args, **kwd))
    update_wrapper(wrapper, func)
    return wrapper


def _complain_ifclosed(closed):
    if closed:
        raise ValueError("I/O operation on closed file")
    
# This class implementation was inspired by the pure python implementation
# of the StringIO class in the python standard library

class IterableAsFileAdapter(object):
    mode = 'r'
    softspace = 0
    encoding = None
    errors = None
    
    def __init__(self, iterable):
        self.source = filter(None, iterable)
        self.start = self.pos = self.end = 0
        self.buf = ''
        self._closed = False
        
    def is_sane(self):
        """Return a bool indicating that the file-like instance satisfies the
        class invariants defined in its implementation. A False value
        should not happen, it would mean an internal implementation error.
        """
        return bool(self._closed or (
            (self.start <= self.pos <= self.end)
            and (self.start + len(self.buf) == self.end)
            and ((self.source is not None) or self.buf == '')
            ))

    def __iter__(self):
        return self

    def next(self):
        """A file object is its own iterator, for example iter(f) returns f
        (unless f is closed). When a file is used as an iterator, typically
        in a for loop (for example, for line in f: print line), the next()
        method is called repeatedly. This method returns the next input line,
        or raises StopIteration when EOF is hit.
        """
        r = self.readline()
        if not r:
            raise StopIteration
        return r
    __next__ = next

    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.close()
        
    @property
    def closed(self):
        """Bool indicating the current state of the file-like object. This is a
        read-only attribute; the close() method changes the value."""
        return self._closed
    
    def close(self):
        """Free the memory buffer.
        """
        if not self._closed:
            self._closed, self.buf, self.source = True, '', None
            self.start = self.pos = self.end = 0

    def isatty(self):
        """Return False.
        """
        _complain_ifclosed(self._closed)
        return False
    
    def _go_next(self):
        """Advance the file position to the beginning of the next string
        in the iterable source. For internal use only.
        """
        self.start = self.pos = self.end
        try:
            self.buf = next(self.source)
        except StopIteration:
            self.source, self.buf = None, ''
        except TypeError:
            if self.source is not None:
                raise
        else:
            self.end += len(self.buf)
                
    def seek(self, where, mode = 0):
        """Set the file's current position.

        The mode argument is optional and defaults to 0 (absolute file
        positioning); other values are 1 (seek relative to the current
        position) and 2 (seek relative to the file's end). This adapter
        class only supports seek to positions beyond the current file position
        and does not support mode 2. A ValueError is raised when these
        conditions fail.

        There is no return value.
        """
        _complain_ifclosed(self._closed)
        if mode == 2:
            raise ValueError("File-like object does not support seek() from eof.")
        elif mode == 1:
            where += self.pos # use absolute positioning
        if where < self.pos:
            raise ValueError("File-like object does not support backward seek().")
        while where > self.end:
            self._go_next()
            if self.source is None:
                return
        self.pos = where

    def tell(self):
        """Return the file's current position."""
        _complain_ifclosed(self._closed)
        return self.pos

    def read(self, size = -1):
        """Read at most size bytes from the file
        (less if the read hits EOF before obtaining size bytes).

        If the size argument is negative or omitted, read all data until EOF
        is reached. The bytes are returned as a string object. An empty
        string is returned when EOF is encountered immediately.
        """
        _complain_ifclosed(self._closed)
        if self.source is None:
            return ''
        L = list()
        if size < 0:
            L.append(self.buf[self.pos-self.start:])
            L.extend(self.source)
            self.source, self.buf = None, ''
            s = ''.join(L)
            self.start = self.pos = self.end = self.pos + len(s)
            return s
        else:
            where = self.pos + size
            while where > self.end:
                L.append(self.buf[self.pos-self.start:])
                self._go_next()
                if self.source is None:
                    break
            else:
                L.append(self.buf[self.pos-self.start:where-self.start])
                self.pos = where
            return ''.join(L)

    def readline(self, size = None):
        r"""Read one entire line from the file.

        A trailing newline character is kept in the string (but may be absent
        when a file ends with an incomplete line). If the size argument is
        present and non-negative, it is a maximum byte count (including the
        trailing newline) and an incomplete line may be returned.

        An empty string is returned only when EOF is encountered immediately.

        Note: Unlike stdio's fgets(), the returned string contains null
        characters ('\0') if they occurred in the input.
        """
        _complain_ifclosed(self._closed)
        stop = -1 if (size is None or size < 0) else (self.pos + size)
        L = list()
        while not (0 <= stop <= self.end):
            newline = self.buf.find('\n', self.pos - self.start)
            if newline >= 0:
                break
            else:
                L.append(self.buf[self.pos - self.start:])
                self._go_next()
                if self.source is None:
                    return ''.join(L)
        else:
            newline = self.buf.find('\n', self.pos - self.start,
                                stop - self.start) # newline < stop - self.start
        if newline >= 0:
            stop = self.start + newline + 1
        L.append(self.buf[self.pos - self.start: stop - self.start])
        self.pos = stop
        return ''.join(L)

    def readlines(self, sizehint = 0):
        """Read until EOF using readline() and return a list containing the
        lines thus read.

        If the optional sizehint argument is present, instead of reading up
        to EOF, whole lines totalling approximately sizehint bytes (or more
        to accommodate a final whole line).
        """
        total = 0
        lines = []
        line = self.readline()
        while line:
            lines.append(line)
            total += len(line)
            if 0 < sizehint <= total:
                break
            line = self.readline()
        return lines
    
    def xreadlines(self):
        """Deprecated method returning the same thing as iter(self)."""
        return iter(self)
    
    def flush(self):
        """This is a no-op for this file-like object.
        """
        _complain_ifclosed(self._closed)
        
    @property
    def name(self):
        return "<%s>" % self.__class__.__name__
    
    def _not_writable(self, *args, **kwd):
        raise IOError("File not opened for writing")
    
    write = writelines = truncate = _not_writable

# test code

def test():
    import sys
    if sys.argv[1:]:
        file = sys.argv[1]
    else:
        file = '/etc/passwd'
    text = open(file, 'r').read(100000)
    if not len(text):
        text = "x"*70 + "\n"
    while len(text) < 10000:
        text += text
    assert 10000 <= len(text) <= 100000

    def sample_iterable():
        from random import Random
        ra = Random()
        ra.seed(12345678901234567890)
        i = 0
        while i < len(text):
            d = ra.randint(10, 120)
            yield text[i: i+d]
            i += d
    def new():
        return IterableAsFileAdapter(sample_iterable())

    assert new().read() == text

    f = new()
    x = list()
    while True:
        x.append(f.read(31))
        assert f.is_sane()
        if len(x[-1]) != 31:
            assert f.tell() == len(text)
            break
    assert f.tell() == len(text)
    assert ''.join(x) == text

    f = new()
    while True:
        p = f.tell()
        f.seek(47, 1)
        assert f.is_sane()
        if p + 47 < len(text):
            assert f.tell() == p + 47
        else:
            assert f.tell() == len(text)
            break

    f = new()
    while True:
        p = f.tell()
        f.seek(p+47)
        assert f.is_sane()
        if p + 47 < len(text):
            assert f.tell() == p + 47
        else:
            assert f.tell() == len(text)
            break
        
    f = new()
    U, V = f.readlines(), text.split('\n')
    if not V[-1]:
        del V[-1]
    if not U[-1].endswith('\n'):
        U[-1] += '\n'
    assert len(U) == len(V)
    assert all(x == y + '\n' for x, y in zip(U, V))
    
    with new() as f:
        for line in f:
            assert f.is_sane()
            
    f = new()
    L = list()
    while True:
        line = f.readline(29)
        L.append(line)
        if line and len(line) != 29:
            assert line[-1] == "\n" or f.tell() == len(text)
        if not line:
            break
    assert ''.join(L) == text
    

if __name__ == "__main__":
    test()
Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.