我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用email.message.readline()。
def _generate_toc(self): """Generate key-to-(start, stop) table of contents.""" starts, stops = [], [] self._file.seek(0) while True: line_pos = self._file.tell() line = self._file.readline() if line.startswith('From '): if len(stops) < len(starts): stops.append(line_pos - len(os.linesep)) starts.append(line_pos) elif line == '': stops.append(line_pos) break self._toc = dict(enumerate(zip(starts, stops))) self._next_key = len(self._toc) self._file_length = self._file.tell()
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) visible_headers = StringIO.StringIO() while True: line = self._file.readline() if line == os.linesep or line == '': break visible_headers.write(line.replace(os.linesep, '\n')) body = self._file.read(stop - self._file.tell()).replace(os.linesep, '\n') msg = BabylMessage(original_headers.getvalue() + body) msg.set_visible(visible_headers.getvalue()) if key in self._labels: msg.set_labels(self._labels[key]) return msg
def get_string(self, key): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) while True: line = self._file.readline() if line == os.linesep or line == '': break return original_headers.getvalue() + \ self._file.read(stop - self._file.tell()).replace(os.linesep, '\n')
def _generate_toc(self): """Generate key-to-(start, stop) table of contents.""" starts, stops = [], [] self._file.seek(0) while True: line_pos = self._file.tell() line = self._file.readline() if line.startswith(b'From '): if len(stops) < len(starts): stops.append(line_pos - len(linesep)) starts.append(line_pos) elif not line: stops.append(line_pos) break self._toc = dict(enumerate(zip(starts, stops))) self._next_key = len(self._toc) self._file_length = self._file.tell()
def get_bytes(self, key): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip b'1,' line specifying labels. original_headers = io.BytesIO() while True: line = self._file.readline() if line == b'*** EOOH ***' + linesep or not line: break original_headers.write(line.replace(linesep, b'\n')) while True: line = self._file.readline() if line == linesep or not line: break headers = original_headers.getvalue() n = stop - self._file.tell() assert n >= 0 data = self._file.read(n) data = data.replace(linesep, b'\n') return headers + data
def _dump_message(self, message, target, mangle_from_=False): # Most files are opened in binary mode to allow predictable seeking. # To get native line endings on disk, the user-friendly \n line endings # used in strings and by email.Message are translated here. """Dump message contents to target file.""" if isinstance(message, email.message.Message): buffer = StringIO.StringIO() gen = email.generator.Generator(buffer, mangle_from_, 0) gen.flatten(message) buffer.seek(0) target.write(buffer.read().replace('\n', os.linesep)) elif isinstance(message, str): if mangle_from_: message = message.replace('\nFrom ', '\n>From ') message = message.replace('\n', os.linesep) target.write(message) elif hasattr(message, 'read'): while True: line = message.readline() if line == '': break if mangle_from_ and line.startswith('From '): line = '>From ' + line[5:] line = line.replace('\n', os.linesep) target.write(line) else: raise TypeError('Invalid message type: %s' % type(message))
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) from_line = self._file.readline().replace(os.linesep, '') string = self._file.read(stop - self._file.tell()) msg = self._message_factory(string.replace(os.linesep, '\n')) msg.set_from(from_line[5:]) return msg
def get_string(self, key, from_=False): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) if not from_: self._file.readline() string = self._file.read(stop - self._file.tell()) return string.replace(os.linesep, '\n')
def get_file(self, key, from_=False): """Return a file-like representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) if not from_: self._file.readline() return _PartialFile(self._file, self._file.tell(), stop)
def _generate_toc(self): """Generate key-to-(start, stop) table of contents.""" starts, stops = [], [] self._file.seek(0) next_pos = 0 label_lists = [] while True: line_pos = next_pos line = self._file.readline() next_pos = self._file.tell() if line == '\037\014' + os.linesep: if len(stops) < len(starts): stops.append(line_pos - len(os.linesep)) starts.append(next_pos) labels = [label.strip() for label in self._file.readline()[1:].split(',') if label.strip() != ''] label_lists.append(labels) elif line == '\037' or line == '\037' + os.linesep: if len(stops) < len(starts): stops.append(line_pos - len(os.linesep)) elif line == '': stops.append(line_pos - len(os.linesep)) break self._toc = dict(enumerate(zip(starts, stops))) self._labels = dict(enumerate(label_lists)) self._next_key = len(self._toc) self._file.seek(0, 2) self._file_length = self._file.tell()
def readline(self, size=None): """Read a line.""" return self._read(size, self._file.readline)
def __iter__(self): """Iterate over lines.""" return iter(self.readline, "")
def _search_end(self): self.fp.readline() # Throw away header line while 1: pos = self.fp.tell() line = self.fp.readline() if not line: return if line[:5] == 'From ' and self._isrealfromline(line): self.fp.seek(pos) return # An overridable mechanism to test for From-line-ness. You can either # specify a different regular expression or define a whole new # _isrealfromline() method. Note that this only gets called for lines # starting with the 5 characters "From ". # # BAW: According to #http://home.netscape.com/eng/mozilla/2.0/relnotes/demo/content-length.html # the only portable, reliable way to find message delimiters in a BSD (i.e # Unix mailbox) style folder is to search for "\n\nFrom .*\n", or at the # beginning of the file, "^From .*\n". While _fromlinepattern below seems # like a good idea, in practice, there are too many variations for more # strict parsing of the line to be completely accurate. # # _strict_isrealfromline() is the old version which tries to do stricter # parsing of the From_ line. _portable_isrealfromline() simply returns # true, since it's never called if the line doesn't already start with # "From ". # # This algorithm, and the way it interacts with _search_start() and # _search_end() may not be completely correct, because it doesn't check # that the two characters preceding "From " are \n\n or the beginning of # the file. Fixing this would require a more extensive rewrite than is # necessary. For convenience, we've added a PortableUnixMailbox class # which does no checking of the format of the 'From' line.
def _search_start(self): while 1: line = self.fp.readline() if not line: raise EOFError if line[:5] == '\001\001\001\001\n': return
def _search_end(self): while 1: pos = self.fp.tell() line = self.fp.readline() if not line: return if line == '\001\001\001\001\n': self.fp.seek(pos) return
def _search_start(self): while 1: line = self.fp.readline() if not line: raise EOFError if line == '*** EOOH ***\n': return
def _search_end(self): while 1: pos = self.fp.tell() line = self.fp.readline() if not line: return if line == '\037\014\n' or line == '\037': self.fp.seek(pos) return ## End: classes from the original module (for backward compatibility).
def _dump_message(self, message, target, mangle_from_=False): # Most files are opened in binary mode to allow predictable seeking. # To get native line endings on disk, the user-friendly \n line endings # used in strings and by email.Message are translated here. """Dump message contents to target file.""" if isinstance(message, email.message.Message): buffer = StringIO.StringIO() gen = email.generator.Generator(buffer, mangle_from_, 0) gen.flatten(message) buffer.seek(0) data = buffer.read().replace('\n', os.linesep) target.write(data) if self._append_newline and not data.endswith(os.linesep): # Make sure the message ends with a newline target.write(os.linesep) elif isinstance(message, str): if mangle_from_: message = message.replace('\nFrom ', '\n>From ') message = message.replace('\n', os.linesep) target.write(message) if self._append_newline and not message.endswith(os.linesep): # Make sure the message ends with a newline target.write(os.linesep) elif hasattr(message, 'read'): lastline = None while True: line = message.readline() if line == '': break if mangle_from_ and line.startswith('From '): line = '>From ' + line[5:] line = line.replace('\n', os.linesep) target.write(line) lastline = line if self._append_newline and lastline and not lastline.endswith(os.linesep): # Make sure the message ends with a newline target.write(os.linesep) else: raise TypeError('Invalid message type: %s' % type(message))
def _generate_toc(self): """Generate key-to-(start, stop) table of contents.""" starts, stops = [], [] last_was_empty = False self._file.seek(0) while True: line_pos = self._file.tell() line = self._file.readline() if line.startswith('From '): if len(stops) < len(starts): if last_was_empty: stops.append(line_pos - len(os.linesep)) else: # The last line before the "From " line wasn't # blank, but we consider it a start of a # message anyway. stops.append(line_pos) starts.append(line_pos) last_was_empty = False elif not line: if last_was_empty: stops.append(line_pos - len(os.linesep)) else: stops.append(line_pos) break elif line == os.linesep: last_was_empty = True else: last_was_empty = False self._toc = dict(enumerate(zip(starts, stops))) self._next_key = len(self._toc) self._file_length = self._file.tell()
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) from_line = self._file.readline().replace(linesep, b'') string = self._file.read(stop - self._file.tell()) msg = self._message_factory(string.replace(linesep, b'\n')) msg.set_from(from_line[5:].decode('ascii')) return msg
def get_bytes(self, key, from_=False): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) if not from_: self._file.readline() string = self._file.read(stop - self._file.tell()) return string.replace(linesep, b'\n')
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip b'1,' line specifying labels. original_headers = io.BytesIO() while True: line = self._file.readline() if line == b'*** EOOH ***' + linesep or not line: break original_headers.write(line.replace(linesep, b'\n')) visible_headers = io.BytesIO() while True: line = self._file.readline() if line == linesep or not line: break visible_headers.write(line.replace(linesep, b'\n')) # Read up to the stop, or to the end n = stop - self._file.tell() assert n >= 0 body = self._file.read(n) body = body.replace(linesep, b'\n') msg = BabylMessage(original_headers.getvalue() + body) msg.set_visible(visible_headers.getvalue()) if key in self._labels: msg.set_labels(self._labels[key]) return msg