# Name: module adaptstrings.py # Date: october 29 2011 # Author: Gribouillis for the python forum at www.daniweb.com # License: public domain # Use this code freely in your programs """Module to adapt sequences of strings to a file-like interface. """ __all__ = ["adapt_as_file", "adapt_as_opener"] def adapt_as_file(iterable): """Adapt the iterable argument to look like a file object opened for reading. It is assumed that the iterable's items are strings. Return a file-like object with methods close(), read(), readline(), readlines() seek(), tell(). See the documentation of the class IterableAsFileAdapter for complete documentation. typical usage: f = adapt_as_file(sequence_of_strings) for line in f: ... # use f as if it were a file opened for reading """ return IterableAsFileAdapter(iterable) def adapt_as_opener(func): """Decorator to adapt a string generator function as a function which return a file-like object opened for reading. See the documentation of adapt_as_file() for more. typical usage: @adapt_as_opener def data_source(args): yield a_string yield another_string f = data_source(args) for line in f: ... # use f as if it were a file opened for reading """ def wrapper(*args, **kwd): return adapt_as_file(func(*args, **kwd)) update_wrapper(wrapper, func) return wrapper def _complain_ifclosed(closed): if closed: raise ValueError, "I/O operation on closed file" # This class implementation was inspired by the pure python implementation # of the StringIO class in the python standard library class IterableAsFileAdapter(object): def __init__(self, iterable): self.source = iter(iterable) self.start = self.pos = self.end = 0 self.buf = '' self._closed = False def is_sane(self): """return a bool indicating that the file-like instance satisfies the class invariants defined in its implementation. A False value should not happen, it would mean an internal implementation error. """ return bool(self._closed or ( (self.start <= self.pos <= self.end) and (self.start + len(self.buf) == self.end) and ((self.source is not None) or self.buf == '') )) def __iter__(self): return self def next(self): """A file object is its own iterator, for example iter(f) returns f (unless f is closed). When a file is used as an iterator, typically in a for loop (for example, for line in f: print line), the next() method is called repeatedly. This method returns the next input line, or raises StopIteration when EOF is hit. """ r = self.readline() if not r: raise StopIteration return r def __enter__(self): return self def __exit__(self, *args): self.close() @property def closed(self): """bool indicating the current state of the file-like object. This is a read-only attribute; the close() method changes the value.""" return self._closed def close(self): """Free the memory buffer. """ if not self._closed: self._closed, self.buf, self.source = True, '', None self.start = self.pos = self.end = 0 def isatty(self): """Returns False. """ _complain_ifclosed(self._closed) return False def _go_next(self): """Advance the file position to the beginning of the next string in the iterable source. For internal use only. """ self.start = self.pos = self.end self.buf = '' if self.source is not None: try: self.buf = next(self.source) self.end += len(self.buf) except StopIteration: self.source = None def seek(self, where, mode = 0): """Set the file's current position. The mode argument is optional and defaults to 0 (absolute file positioning); other values are 1 (seek relative to the current position) and 2 (seek relative to the file's end). This adapter class only support seek to positions beyond the current file position and does not support mode 2. A ValueError is raised when these conditions fail. There is no return value. """ _complain_ifclosed(self._closed) if mode == 2: raise ValueError("File-like object does not support seek() from eof.") elif mode == 1: where += self.pos # use absolute positioning if where < self.pos: raise ValueError("File-like object does not support backward seek().") while where > self.end: self._go_next() if self.source is None: return self.pos = where def tell(self): """Return the file's current position.""" _complain_ifclosed(self._closed) return self.pos def read(self, n = -1): """Read at most size bytes from the file (less if the read hits EOF before obtaining size bytes). If the size argument is negative or omitted, read all data until EOF is reached. The bytes are returned as a string object. An empty string is returned when EOF is encountered immediately. """ _complain_ifclosed(self._closed) L = list() if n < 0: L.append(self.buf[self.pos-self.start:]) while self.source is not None: self._go_next() L.append(self.buf) else: where = self.pos + n while where > self.end: L.append(self.buf[self.pos-self.start:]) self._go_next() if self.source is None: break else: L.append(self.buf[self.pos-self.start:where-self.start]) self.pos = where return ''.join(L) def readline(self, length = None): r"""Read one entire line from the file. A trailing newline character is kept in the string (but may be absent when a file ends with an incomplete line). If the size argument is present and non-negative, it is a maximum byte count (including the trailing newline) and an incomplete line may be returned. An empty string is returned only when EOF is encountered immediately. Note: Unlike stdio's fgets(), the returned string contains null characters ('\0') if they occurred in the input. """ _complain_ifclosed(self._closed) if length is not None and length > 0: stop = self.pos + length else: stop = -1 L = list() while True: where = self.buf.find('\n', self.pos-self.start) if where < 0: # newline not found in buffer if self.pos <= stop <= self.end: L.append(self.buf[self.pos-self.start:stop-self.start]) self.pos = stop break else: L.append(self.buf[self.pos-self.start:]) self._go_next() if self.source is None: break else: where += 1 + self.start if 0 <= stop < where: where = stop L.append(self.buf[self.pos-self.start:where-self.start]) self.pos = where break return ''.join(L) def readlines(self, sizehint = 0): """Read until EOF using readline() and return a list containing the lines thus read. If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (or more to accommodate a final whole line). """ total = 0 lines =  line = self.readline() while line: lines.append(line) total += len(line) if 0 < sizehint <= total: break line = self.readline() return lines def flush(self): """This is a no-op for this file-like object. """ _complain_ifclosed(self._closed) @property def name(self): return "<%s>" % self.__class__.__name__ # test code def test(): import sys if sys.argv[1:]: file = sys.argv else: file = '/etc/passwd' text = open(file, 'r').read(100000) if not len(text): text = "x"*70 + "\n" while len(text) < 10000: text += text assert 10000 <= len(text) < 100000 def sample_iterable(): from random import Random ra = Random() ra.seed(12345678901234567890) i = 0 while i < len(text): d = ra.randint(10, 120) yield text[i: i+d] i += d def new(): return IterableAsFileAdapter(sample_iterable()) assert new().read() == text f = new() x = list() while True: x.append(f.read(31)) assert f.is_sane() if len(x[-1]) != 31: assert f.tell() == len(text) break assert f.tell() == len(text) assert ''.join(x) == text f = new() while True: p = f.tell() f.seek(47, 1) assert f.is_sane() if p + 47 < len(text): assert f.tell() == p + 47 else: assert f.tell() == len(text) break f = new() while True: p = f.tell() f.seek(p+47) assert f.is_sane() if p + 47 < len(text): assert f.tell() == p + 47 else: assert f.tell() == len(text) break f = new() U, V = f.readlines(), text.split('\n') if not V[-1]: del V[-1] assert len(U) == len(V) assert all(x == y + '\n' for x, y in zip(U, V)) with new() as f: for line in f: pass if __name__ == "__main__": test()
Edited by Gribouillis: n/a
Edited by Gribouillis: n/a
Are you able to help answer this sponsored question?
Questions asked by members who have earned a lot of community kudos are featured in order to give back and encourage quality replies.