我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用email.message.tell()。
def _generate_toc(self): """Generate key-to-(start, stop) table of contents.""" starts, stops = [], [] self._file.seek(0) while True: line_pos = self._file.tell() line = self._file.readline() if line.startswith('From '): if len(stops) < len(starts): stops.append(line_pos - len(os.linesep)) starts.append(line_pos) elif line == '': stops.append(line_pos) break self._toc = dict(enumerate(zip(starts, stops))) self._next_key = len(self._toc) self._file_length = self._file.tell()
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) visible_headers = StringIO.StringIO() while True: line = self._file.readline() if line == os.linesep or line == '': break visible_headers.write(line.replace(os.linesep, '\n')) body = self._file.read(stop - self._file.tell()).replace(os.linesep, '\n') msg = BabylMessage(original_headers.getvalue() + body) msg.set_visible(visible_headers.getvalue()) if key in self._labels: msg.set_labels(self._labels[key]) return msg
def get_string(self, key): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) while True: line = self._file.readline() if line == os.linesep or line == '': break return original_headers.getvalue() + \ self._file.read(stop - self._file.tell()).replace(os.linesep, '\n')
def _append_message(self, message): """Append message to mailbox and return (start, stop) offsets.""" self._file.seek(0, 2) before = self._file.tell() if len(self._toc) == 0 and not self._pending: # This is the first message, and the _pre_mailbox_hook # hasn't yet been called. If self._pending is True, # messages have been removed, so _pre_mailbox_hook must # have been called already. self._pre_mailbox_hook(self._file) try: self._pre_message_hook(self._file) offsets = self._install_message(message) self._post_message_hook(self._file) except BaseException: self._file.truncate(before) raise self._file.flush() self._file_length = self._file.tell() # Record current length of mailbox return offsets
def _install_message(self, message): """Format a message and blindly write to self._file.""" from_line = None if isinstance(message, str) and message.startswith('From '): newline = message.find('\n') if newline != -1: from_line = message[:newline] message = message[newline + 1:] else: from_line = message message = '' elif isinstance(message, _mboxMMDFMessage): from_line = 'From ' + message.get_from() elif isinstance(message, email.message.Message): from_line = message.get_unixfrom() # May be None. if from_line is None: from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime()) start = self._file.tell() self._file.write(from_line + os.linesep) self._dump_message(message, self._file, self._mangle_from_) stop = self._file.tell() return (start, stop)
def next(self): while 1: self.fp.seek(self.seekp) try: self._search_start() except EOFError: self.seekp = self.fp.tell() return None start = self.fp.tell() self._search_end() self.seekp = stop = self.fp.tell() if start != stop: break return self.factory(_PartialFile(self.fp, start, stop)) # Recommended to use PortableUnixMailbox instead!
def _generate_toc(self): """Generate key-to-(start, stop) table of contents.""" starts, stops = [], [] self._file.seek(0) while True: line_pos = self._file.tell() line = self._file.readline() if line.startswith(b'From '): if len(stops) < len(starts): stops.append(line_pos - len(linesep)) starts.append(line_pos) elif not line: stops.append(line_pos) break self._toc = dict(enumerate(zip(starts, stops))) self._next_key = len(self._toc) self._file_length = self._file.tell()
def get_bytes(self, key): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip b'1,' line specifying labels. original_headers = io.BytesIO() while True: line = self._file.readline() if line == b'*** EOOH ***' + linesep or not line: break original_headers.write(line.replace(linesep, b'\n')) while True: line = self._file.readline() if line == linesep or not line: break headers = original_headers.getvalue() n = stop - self._file.tell() assert n >= 0 data = self._file.read(n) data = data.replace(linesep, b'\n') return headers + data
def _append_message(self, message): """Append message to mailbox and return (start, stop) offsets.""" self._file.seek(0, 2) before = self._file.tell() try: self._pre_message_hook(self._file) offsets = self._install_message(message) self._post_message_hook(self._file) except BaseException: self._file.truncate(before) raise self._file.flush() self._file_length = self._file.tell() # Record current length of mailbox return offsets
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) from_line = self._file.readline().replace(os.linesep, '') string = self._file.read(stop - self._file.tell()) msg = self._message_factory(string.replace(os.linesep, '\n')) msg.set_from(from_line[5:]) return msg
def get_string(self, key, from_=False): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) if not from_: self._file.readline() string = self._file.read(stop - self._file.tell()) return string.replace(os.linesep, '\n')
def get_file(self, key, from_=False): """Return a file-like representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) if not from_: self._file.readline() return _PartialFile(self._file, self._file.tell(), stop)
def _pre_message_hook(self, f): """Called before writing each message to file f.""" if f.tell() != 0: f.write(os.linesep)
def _generate_toc(self): """Generate key-to-(start, stop) table of contents.""" starts, stops = [], [] self._file.seek(0) next_pos = 0 while True: line_pos = next_pos line = self._file.readline() next_pos = self._file.tell() if line.startswith('\001\001\001\001' + os.linesep): starts.append(next_pos) while True: line_pos = next_pos line = self._file.readline() next_pos = self._file.tell() if line == '\001\001\001\001' + os.linesep: stops.append(line_pos - len(os.linesep)) break elif line == '': stops.append(line_pos) break elif line == '': break self._toc = dict(enumerate(zip(starts, stops))) self._next_key = len(self._toc) self._file.seek(0, 2) self._file_length = self._file.tell()