我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用email.message.replace()。
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) visible_headers = StringIO.StringIO() while True: line = self._file.readline() if line == os.linesep or line == '': break visible_headers.write(line.replace(os.linesep, '\n')) body = self._file.read(stop - self._file.tell()).replace(os.linesep, '\n') msg = BabylMessage(original_headers.getvalue() + body) msg.set_visible(visible_headers.getvalue()) if key in self._labels: msg.set_labels(self._labels[key]) return msg
def get_bytes(self, key): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip b'1,' line specifying labels. original_headers = io.BytesIO() while True: line = self._file.readline() if line == b'*** EOOH ***' + linesep or not line: break original_headers.write(line.replace(linesep, b'\n')) while True: line = self._file.readline() if line == linesep or not line: break headers = original_headers.getvalue() n = stop - self._file.tell() assert n >= 0 data = self._file.read(n) data = data.replace(linesep, b'\n') return headers + data
def _create_tmp(self): """Create a file in the tmp subdirectory and open and return it.""" now = time.time() hostname = socket.gethostname() if '/' in hostname: hostname = hostname.replace('/', r'\057') if ':' in hostname: hostname = hostname.replace(':', r'\072') uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(), Maildir._count, hostname) path = os.path.join(self._path, 'tmp', uniq) try: os.stat(path) except FileNotFoundError: Maildir._count += 1 try: return _create_carefully(path) except FileExistsError: pass # Fall through to here if stat succeeded or open raised EEXIST. raise ExternalClashError('Name clash prevented file creation: %s' % path)
def _dump_message(self, message, target, mangle_from_=False): # Most files are opened in binary mode to allow predictable seeking. # To get native line endings on disk, the user-friendly \n line endings # used in strings and by email.Message are translated here. """Dump message contents to target file.""" if isinstance(message, email.message.Message): buffer = StringIO.StringIO() gen = email.generator.Generator(buffer, mangle_from_, 0) gen.flatten(message) buffer.seek(0) target.write(buffer.read().replace('\n', os.linesep)) elif isinstance(message, str): if mangle_from_: message = message.replace('\nFrom ', '\n>From ') message = message.replace('\n', os.linesep) target.write(message) elif hasattr(message, 'read'): while True: line = message.readline() if line == '': break if mangle_from_ and line.startswith('From '): line = '>From ' + line[5:] line = line.replace('\n', os.linesep) target.write(line) else: raise TypeError('Invalid message type: %s' % type(message))
def _create_tmp(self): """Create a file in the tmp subdirectory and open and return it.""" now = time.time() hostname = socket.gethostname() if '/' in hostname: hostname = hostname.replace('/', r'\057') if ':' in hostname: hostname = hostname.replace(':', r'\072') uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(), Maildir._count, hostname) path = os.path.join(self._path, 'tmp', uniq) try: os.stat(path) except OSError, e: if e.errno == errno.ENOENT: Maildir._count += 1 try: return _create_carefully(path) except OSError, e: if e.errno != errno.EEXIST: raise else: raise # Fall through to here if stat succeeded or open raised EEXIST. raise ExternalClashError('Name clash prevented file creation: %s' % path)
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) from_line = self._file.readline().replace(os.linesep, '') string = self._file.read(stop - self._file.tell()) msg = self._message_factory(string.replace(os.linesep, '\n')) msg.set_from(from_line[5:]) return msg
def get_string(self, key, from_=False): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) if not from_: self._file.readline() string = self._file.read(stop - self._file.tell()) return string.replace(os.linesep, '\n')
def get_file(self, key): """Return a file-like representation or raise a KeyError.""" return StringIO.StringIO(self.get_string(key).replace('\n', os.linesep))
def _dump_message(self, message, target, mangle_from_=False): # Most files are opened in binary mode to allow predictable seeking. # To get native line endings on disk, the user-friendly \n line endings # used in strings and by email.Message are translated here. """Dump message contents to target file.""" if isinstance(message, email.message.Message): buffer = StringIO.StringIO() gen = email.generator.Generator(buffer, mangle_from_, 0) gen.flatten(message) buffer.seek(0) data = buffer.read().replace('\n', os.linesep) target.write(data) if self._append_newline and not data.endswith(os.linesep): # Make sure the message ends with a newline target.write(os.linesep) elif isinstance(message, str): if mangle_from_: message = message.replace('\nFrom ', '\n>From ') message = message.replace('\n', os.linesep) target.write(message) if self._append_newline and not message.endswith(os.linesep): # Make sure the message ends with a newline target.write(os.linesep) elif hasattr(message, 'read'): lastline = None while True: line = message.readline() if line == '': break if mangle_from_ and line.startswith('From '): line = '>From ' + line[5:] line = line.replace('\n', os.linesep) target.write(line) lastline = line if self._append_newline and lastline and not lastline.endswith(os.linesep): # Make sure the message ends with a newline target.write(os.linesep) else: raise TypeError('Invalid message type: %s' % type(message))
def get_bytes(self, key): """Return a bytes representation or raise a KeyError.""" f = open(os.path.join(self._path, self._lookup(key)), 'rb') try: return f.read().replace(linesep, b'\n') finally: f.close()
def _create_tmp(self): """Create a file in the tmp subdirectory and open and return it.""" now = time.time() hostname = socket.gethostname() if '/' in hostname: hostname = hostname.replace('/', r'\057') if ':' in hostname: hostname = hostname.replace(':', r'\072') uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(), Maildir._count, hostname) path = os.path.join(self._path, 'tmp', uniq) try: os.stat(path) except OSError as e: if e.errno == errno.ENOENT: Maildir._count += 1 try: return _create_carefully(path) except OSError as e: if e.errno != errno.EEXIST: raise else: raise # Fall through to here if stat succeeded or open raised EEXIST. raise ExternalClashError('Name clash prevented file creation: %s' % path)
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) from_line = self._file.readline().replace(linesep, b'') string = self._file.read(stop - self._file.tell()) msg = self._message_factory(string.replace(linesep, b'\n')) msg.set_from(from_line[5:].decode('ascii')) return msg
def get_bytes(self, key, from_=False): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) if not from_: self._file.readline() string = self._file.read(stop - self._file.tell()) return string.replace(linesep, b'\n')
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip b'1,' line specifying labels. original_headers = io.BytesIO() while True: line = self._file.readline() if line == b'*** EOOH ***' + linesep or not line: break original_headers.write(line.replace(linesep, b'\n')) visible_headers = io.BytesIO() while True: line = self._file.readline() if line == linesep or not line: break visible_headers.write(line.replace(linesep, b'\n')) # Read up to the stop, or to the end n = stop - self._file.tell() assert n >= 0 body = self._file.read(n) body = body.replace(linesep, b'\n') msg = BabylMessage(original_headers.getvalue() + body) msg.set_visible(visible_headers.getvalue()) if key in self._labels: msg.set_labels(self._labels[key]) return msg
def get_file(self, key): """Return a file-like representation or raise a KeyError.""" return io.BytesIO(self.get_bytes(key).replace(b'\n', linesep))
def get_bytes(self, key): """Return a bytes representation or raise a KeyError.""" with open(os.path.join(self._path, self._lookup(key)), 'rb') as f: return f.read().replace(linesep, b'\n')