我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用email.Message()。
def __init__(self, dirname, factory=rfc822.Message, create=True): """Initialize a Maildir instance.""" Mailbox.__init__(self, dirname, factory, create) self._paths = { 'tmp': os.path.join(self._path, 'tmp'), 'new': os.path.join(self._path, 'new'), 'cur': os.path.join(self._path, 'cur'), } if not os.path.exists(self._path): if create: os.mkdir(self._path, 0700) for path in self._paths.values(): os.mkdir(path, 0o700) else: raise NoSuchMailboxError(self._path) self._toc = {} self._toc_mtimes = {} for subdir in ('cur', 'new'): self._toc_mtimes[subdir] = os.path.getmtime(self._paths[subdir]) self._last_read = time.time() # Records last time we read cur/new self._skewfactor = 0.1 # Adjust if os/fs clocks are skewing
def get_message(self, key): """Return a Message representation or raise a KeyError.""" subpath = self._lookup(key) f = open(os.path.join(self._path, subpath), 'r') try: if self._factory: msg = self._factory(f) else: msg = MaildirMessage(f) finally: f.close() subdir, name = os.path.split(subpath) msg.set_subdir(subdir) if self.colon in name: msg.set_info(name.split(self.colon)[-1]) msg.set_date(os.path.getmtime(os.path.join(self._path, subpath))) return msg
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) visible_headers = StringIO.StringIO() while True: line = self._file.readline() if line == os.linesep or line == '': break visible_headers.write(line.replace(os.linesep, '\n')) body = self._file.read(stop - self._file.tell()).replace(os.linesep, '\n') msg = BabylMessage(original_headers.getvalue() + body) msg.set_visible(visible_headers.getvalue()) if key in self._labels: msg.set_labels(self._labels[key]) return msg
def __init__(self, info): # info is either an email.Message or # an httplib.HTTPResponse object. if isinstance(info, httplib.HTTPResponse): for key, value in info.getheaders(): self[key.lower()] = value self.status = info.status self['status'] = str(self.status) self.reason = info.reason self.version = info.version elif isinstance(info, email.Message.Message): for key, value in info.items(): self[key.lower()] = value self.status = int(self['status']) else: for key, value in info.iteritems(): self[key.lower()] = value self.status = int(self.get('status', self.status)) self.reason = self.get('reason', self.reason)
def test_getset_charset(self): eq = self.assertEqual msg = Message() eq(msg.get_charset(), None) charset = Charset('iso-8859-1') msg.set_charset(charset) eq(msg['mime-version'], '1.0') eq(msg.get_content_type(), 'text/plain') eq(msg['content-type'], 'text/plain; charset="iso-8859-1"') eq(msg.get_param('charset'), 'iso-8859-1') eq(msg['content-transfer-encoding'], 'quoted-printable') eq(msg.get_charset().input_charset, 'iso-8859-1') # Remove the charset msg.set_charset(None) eq(msg.get_charset(), None) eq(msg['content-type'], 'text/plain') # Try adding a charset when there's already MIME headers present msg = Message() msg['MIME-Version'] = '2.0' msg['Content-Type'] = 'text/x-weird' msg['Content-Transfer-Encoding'] = 'quinted-puntable' msg.set_charset(charset) eq(msg['mime-version'], '2.0') eq(msg['content-type'], 'text/x-weird; charset="iso-8859-1"') eq(msg['content-transfer-encoding'], 'quinted-puntable')
def test_set_param(self): eq = self.assertEqual msg = Message() msg.set_param('charset', 'iso-2022-jp') eq(msg.get_param('charset'), 'iso-2022-jp') msg.set_param('importance', 'high value') eq(msg.get_param('importance'), 'high value') eq(msg.get_param('importance', unquote=False), '"high value"') eq(msg.get_params(), [('text/plain', ''), ('charset', 'iso-2022-jp'), ('importance', 'high value')]) eq(msg.get_params(unquote=False), [('text/plain', ''), ('charset', '"iso-2022-jp"'), ('importance', '"high value"')]) msg.set_param('charset', 'iso-9999-xx', header='X-Jimmy') eq(msg.get_param('charset', header='X-Jimmy'), 'iso-9999-xx')
def test_replace_header(self): eq = self.assertEqual msg = Message() msg.add_header('First', 'One') msg.add_header('Second', 'Two') msg.add_header('Third', 'Three') eq(msg.keys(), ['First', 'Second', 'Third']) eq(msg.values(), ['One', 'Two', 'Three']) msg.replace_header('Second', 'Twenty') eq(msg.keys(), ['First', 'Second', 'Third']) eq(msg.values(), ['One', 'Twenty', 'Three']) msg.add_header('First', 'Eleven') msg.replace_header('First', 'One Hundred') eq(msg.keys(), ['First', 'Second', 'Third', 'First']) eq(msg.values(), ['One Hundred', 'Twenty', 'Three', 'Eleven']) self.assertRaises(KeyError, msg.replace_header, 'Fourth', 'Missing')
def test_invalid_content_type(self): eq = self.assertEqual neq = self.ndiffAssertEqual msg = Message() # RFC 2045, $5.2 says invalid yields text/plain msg['Content-Type'] = 'text' eq(msg.get_content_maintype(), 'text') eq(msg.get_content_subtype(), 'plain') eq(msg.get_content_type(), 'text/plain') # Clear the old value and try something /really/ invalid del msg['content-type'] msg['Content-Type'] = 'foo' eq(msg.get_content_maintype(), 'text') eq(msg.get_content_subtype(), 'plain') eq(msg.get_content_type(), 'text/plain') # Still, make sure that the message is idempotently generated s = StringIO() g = Generator(s) g.flatten(msg) neq(s.getvalue(), 'Content-Type: foo\n\n')
def test_generate(self): # First craft the message to be encapsulated m = Message() m['Subject'] = 'An enclosed message' m.set_payload('Here is the body of the message.\n') r = MIMEMessage(m) r['Subject'] = 'The enclosing message' s = StringIO() g = Generator(s) g.flatten(r) self.assertEqual(s.getvalue(), """\ Content-Type: message/rfc822 MIME-Version: 1.0 Subject: The enclosing message Subject: An enclosed message Here is the body of the message. """)
def test_parser(self): eq = self.assertEquals unless = self.failUnless msg, text = self._msgobj('msg_06.txt') # Check some of the outer headers eq(msg.get_content_type(), 'message/rfc822') # Make sure the payload is a list of exactly one sub-Message, and that # that submessage has a type of text/plain payload = msg.get_payload() unless(isinstance(payload, list)) eq(len(payload), 1) msg1 = payload[0] self.failUnless(isinstance(msg1, Message)) eq(msg1.get_content_type(), 'text/plain') self.failUnless(isinstance(msg1.get_payload(), str)) eq(msg1.get_payload(), '\n') # Test various other bits of the package's functionality
def test_message_from_string_with_class(self): unless = self.failUnless fp = openfile('msg_01.txt') try: text = fp.read() finally: fp.close() # Create a subclass class MyMessage(Message): pass msg = email.message_from_string(text, MyMessage) unless(isinstance(msg, MyMessage)) # Try something more complicated fp = openfile('msg_02.txt') try: text = fp.read() finally: fp.close() msg = email.message_from_string(text, MyMessage) for subpart in msg.walk(): unless(isinstance(subpart, MyMessage))
def test_message_from_file_with_class(self): unless = self.failUnless # Create a subclass class MyMessage(Message): pass fp = openfile('msg_01.txt') try: msg = email.message_from_file(fp, MyMessage) finally: fp.close() unless(isinstance(msg, MyMessage)) # Try something more complicated fp = openfile('msg_02.txt') try: msg = email.message_from_file(fp, MyMessage) finally: fp.close() for subpart in msg.walk(): unless(isinstance(subpart, MyMessage))
def test_get_body_encoding_with_uppercase_charset(self): eq = self.assertEqual msg = Message() msg['Content-Type'] = 'text/plain; charset=UTF-8' eq(msg['content-type'], 'text/plain; charset=UTF-8') charsets = msg.get_charsets() eq(len(charsets), 1) eq(charsets[0], 'utf-8') charset = Charset(charsets[0]) eq(charset.get_body_encoding(), 'base64') msg.set_payload('hello world', charset=charset) eq(msg.get_payload(), 'aGVsbG8gd29ybGQ=\n') eq(msg.get_payload(decode=True), 'hello world') eq(msg['content-transfer-encoding'], 'base64') # Try another one msg = Message() msg['Content-Type'] = 'text/plain; charset="US-ASCII"' charsets = msg.get_charsets() eq(len(charsets), 1) eq(charsets[0], 'us-ascii') charset = Charset(charsets[0]) eq(charset.get_body_encoding(), Encoders.encode_7or8bit) msg.set_payload('hello world', charset=charset) eq(msg.get_payload(), 'hello world') eq(msg['content-transfer-encoding'], '7bit')
def test_parser(self): eq = self.assertEqual msg, text = self._msgobj('msg_06.txt') # Check some of the outer headers eq(msg.get_content_type(), 'message/rfc822') # Make sure the payload is a list of exactly one sub-Message, and that # that submessage has a type of text/plain payload = msg.get_payload() self.assertIsInstance(payload, list) eq(len(payload), 1) msg1 = payload[0] self.assertIsInstance(msg1, Message) eq(msg1.get_content_type(), 'text/plain') self.assertIsInstance(msg1.get_payload(), str) eq(msg1.get_payload(), '\n') # Test various other bits of the package's functionality
def test_message_from_string_with_class(self): fp = openfile('msg_01.txt') try: text = fp.read() finally: fp.close() # Create a subclass class MyMessage(Message): pass msg = email.message_from_string(text, MyMessage) self.assertIsInstance(msg, MyMessage) # Try something more complicated fp = openfile('msg_02.txt') try: text = fp.read() finally: fp.close() msg = email.message_from_string(text, MyMessage) for subpart in msg.walk(): self.assertIsInstance(subpart, MyMessage)
def test__all__(self): module = __import__('email') all = module.__all__ all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def _install_message(self, message): """Format a message and blindly write to self._file.""" from_line = None if isinstance(message, str) and message.startswith('From '): newline = message.find('\n') if newline != -1: from_line = message[:newline] message = message[newline + 1:] else: from_line = message message = '' elif isinstance(message, _mboxMMDFMessage): from_line = 'From ' + message.get_from() elif isinstance(message, email.message.Message): from_line = message.get_unixfrom() # May be None. if from_line is None: from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime()) start = self._file.tell() self._file.write(from_line + os.linesep) self._dump_message(message, self._file, self._mangle_from_) stop = self._file.tell() return (start, stop)