我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用email.message()。
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" old_subpath = self._lookup(key) temp_key = self.add(message) temp_subpath = self._lookup(temp_key) if isinstance(message, MaildirMessage): # temp's subdir and suffix were specified by message. dominant_subpath = temp_subpath else: # temp's subdir and suffix were defaults from add(). dominant_subpath = old_subpath subdir = os.path.dirname(dominant_subpath) if self.colon in dominant_subpath: suffix = self.colon + dominant_subpath.split(self.colon)[-1] else: suffix = '' self.discard(key) new_path = os.path.join(self._path, subdir, key + suffix) os.rename(os.path.join(self._path, temp_subpath), new_path) if isinstance(message, MaildirMessage): os.utime(new_path, (os.path.getatime(new_path), message.get_date()))
def remove_folder(self, folder): """Delete the named folder, which must be empty.""" path = os.path.join(self._path, '.' + folder) for entry in os.listdir(os.path.join(path, 'new')) + \ os.listdir(os.path.join(path, 'cur')): if len(entry) < 1 or entry[0] != '.': raise NotEmptyError('Folder contains message(s): %s' % folder) for entry in os.listdir(path): if entry != 'new' and entry != 'cur' and entry != 'tmp' and \ os.path.isdir(os.path.join(path, entry)): raise NotEmptyError("Folder contains subdirectory '%s': %s" % (folder, entry)) for root, dirs, files in os.walk(path, topdown=False): for entry in files: os.remove(os.path.join(root, entry)) for entry in dirs: os.rmdir(os.path.join(root, entry)) os.rmdir(path)
def get_string(self, key): """Return a string representation or raise a KeyError.""" try: if self._locked: f = open(os.path.join(self._path, str(key)), 'r+') else: f = open(os.path.join(self._path, str(key)), 'r') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: return f.read() finally: if self._locked: _unlock_file(f) finally: f.close()
def test_get_decoded_payload(self): eq = self.assertEqual msg = self._msgobj('msg_10.txt') # The outer message is a multipart eq(msg.get_payload(decode=True), None) # Subpart 1 is 7bit encoded eq(msg.get_payload(0).get_payload(decode=True), 'This is a 7bit encoded message.\n') # Subpart 2 is quopri eq(msg.get_payload(1).get_payload(decode=True), '\xa1This is a Quoted Printable encoded message!\n') # Subpart 3 is base64 eq(msg.get_payload(2).get_payload(decode=True), 'This is a Base64 encoded message.') # Subpart 4 has no Content-Transfer-Encoding: header. eq(msg.get_payload(3).get_payload(decode=True), 'This has no Content-Transfer-Encoding: header.\n')
def test_invalid_content_type(self): eq = self.assertEqual neq = self.ndiffAssertEqual msg = Message() # RFC 2045, $5.2 says invalid yields text/plain msg['Content-Type'] = 'text' eq(msg.get_content_maintype(), 'text') eq(msg.get_content_subtype(), 'plain') eq(msg.get_content_type(), 'text/plain') # Clear the old value and try something /really/ invalid del msg['content-type'] msg['Content-Type'] = 'foo' eq(msg.get_content_maintype(), 'text') eq(msg.get_content_subtype(), 'plain') eq(msg.get_content_type(), 'text/plain') # Still, make sure that the message is idempotently generated s = StringIO() g = Generator(s) g.flatten(msg) neq(s.getvalue(), 'Content-Type: foo\n\n')
def test_missing_start_boundary(self): outer = self._msgobj('msg_42.txt') # The message structure is: # # multipart/mixed # text/plain # message/rfc822 # multipart/mixed [*] # # [*] This message is missing its start boundary bad = outer.get_payload(1).get_payload(0) self.assertEqual(len(bad.defects), 1) self.failUnless(isinstance(bad.defects[0], errors.StartBoundaryNotFoundDefect)) # Test RFC 2047 header encoding and decoding
def test_generate(self): # First craft the message to be encapsulated m = Message() m['Subject'] = 'An enclosed message' m.set_payload('Here is the body of the message.\n') r = MIMEMessage(m) r['Subject'] = 'The enclosing message' s = StringIO() g = Generator(s) g.flatten(r) self.assertEqual(s.getvalue(), """\ Content-Type: message/rfc822 MIME-Version: 1.0 Subject: The enclosing message Subject: An enclosed message Here is the body of the message. """)
def test_default_type(self): eq = self.assertEqual fp = openfile('msg_30.txt') try: msg = email.message_from_file(fp) finally: fp.close() container1 = msg.get_payload(0) eq(container1.get_default_type(), 'message/rfc822') eq(container1.get_content_type(), 'message/rfc822') container2 = msg.get_payload(1) eq(container2.get_default_type(), 'message/rfc822') eq(container2.get_content_type(), 'message/rfc822') container1a = container1.get_payload(0) eq(container1a.get_default_type(), 'text/plain') eq(container1a.get_content_type(), 'text/plain') container2a = container2.get_payload(0) eq(container2a.get_default_type(), 'text/plain') eq(container2a.get_content_type(), 'text/plain')
def test_default_type_with_explicit_container_type(self): eq = self.assertEqual fp = openfile('msg_28.txt') try: msg = email.message_from_file(fp) finally: fp.close() container1 = msg.get_payload(0) eq(container1.get_default_type(), 'message/rfc822') eq(container1.get_content_type(), 'message/rfc822') container2 = msg.get_payload(1) eq(container2.get_default_type(), 'message/rfc822') eq(container2.get_content_type(), 'message/rfc822') container1a = container1.get_payload(0) eq(container1a.get_default_type(), 'text/plain') eq(container1a.get_content_type(), 'text/plain') container2a = container2.get_payload(0) eq(container2a.get_default_type(), 'text/plain') eq(container2a.get_content_type(), 'text/plain')
def test_mime_attachments_in_constructor(self): eq = self.assertEqual text1 = MIMEText('') text2 = MIMEText('') msg = MIMEMultipart(_subparts=(text1, text2)) eq(len(msg.get_payload()), 2) eq(msg.get_payload(0), text1) eq(msg.get_payload(1), text2) # A general test of parser->model->generator idempotency. IOW, read a message # in, parse it into a message object tree, then without touching the tree, # regenerate the plain text. The original text and the transformed text # should be identical. Note: that we ignore the Unix-From since that may # contain a changed date.
def test__all__(self): module = __import__('email') # Can't use sorted() here due to Python 2.3 compatibility all = module.__all__[:] all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def test_body_line_iterator(self): eq = self.assertEqual neq = self.ndiffAssertEqual # First a simple non-multipart message msg = self._msgobj('msg_01.txt') it = iterators.body_line_iterator(msg) lines = list(it) eq(len(lines), 6) neq(EMPTYSTRING.join(lines), msg.get_payload()) # Now a more complicated multipart msg = self._msgobj('msg_02.txt') it = iterators.body_line_iterator(msg) lines = list(it) eq(len(lines), 43) fp = openfile('msg_19.txt') try: neq(EMPTYSTRING.join(lines), fp.read()) finally: fp.close()
def test_typed_subpart_iterator_default_type(self): eq = self.assertEqual msg = self._msgobj('msg_03.txt') it = iterators.typed_subpart_iterator(msg, 'text', 'plain') lines = [] subparts = 0 for subpart in it: subparts += 1 lines.append(subpart.get_payload()) eq(subparts, 1) eq(EMPTYSTRING.join(lines), """\ Hi, Do you like this message? -Me """)
def test_get_decoded_payload(self): eq = self.assertEqual msg = self._msgobj('msg_10.txt') # The outer message is a multipart eq(msg.get_payload(decode=True), None) # Subpart 1 is 7bit encoded eq(msg.get_payload(0).get_payload(decode=True), 'This is a 7bit encoded message.\n') # Subpart 2 is quopri eq(msg.get_payload(1).get_payload(decode=True), '\xa1This is a Quoted Printable encoded message!\n') # Subpart 3 is base64 eq(msg.get_payload(2).get_payload(decode=True), 'This is a Base64 encoded message.') # Subpart 4 is base64 with a trailing newline, which # used to be stripped (issue 7143). eq(msg.get_payload(3).get_payload(decode=True), 'This is a Base64 encoded message.\n') # Subpart 5 has no Content-Transfer-Encoding: header. eq(msg.get_payload(4).get_payload(decode=True), 'This has no Content-Transfer-Encoding: header.\n')
def test_as_string(self): eq = self.assertEqual msg = self._msgobj('msg_01.txt') fp = openfile('msg_01.txt') try: # BAW 30-Mar-2009 Evil be here. So, the generator is broken with # respect to long line breaking. It's also not idempotent when a # header from a parsed message is continued with tabs rather than # spaces. Before we fixed bug 1974 it was reversedly broken, # i.e. headers that were continued with spaces got continued with # tabs. For Python 2.x there's really no good fix and in Python # 3.x all this stuff is re-written to be right(er). Chris Withers # convinced me that using space as the default continuation # character is less bad for more applications. text = fp.read().replace('\t', ' ') finally: fp.close() self.ndiffAssertEqual(text, msg.as_string()) fullrepr = str(msg) lines = fullrepr.split('\n') self.assertTrue(lines[0].startswith('From ')) eq(text, NL.join(lines[1:]))
def test_binary_body_with_encode_noop(self): # Issue 16564: This does not produce an RFC valid message, since to be # valid it should have a CTE of binary. But the below works, and is # documented as working this way. bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff' msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop) self.assertEqual(msg.get_payload(), bytesdata) self.assertEqual(msg.get_payload(decode=True), bytesdata) s = StringIO() g = Generator(s) g.flatten(msg) wireform = s.getvalue() msg2 = email.message_from_string(wireform) self.assertEqual(msg.get_payload(), bytesdata) self.assertEqual(msg2.get_payload(decode=True), bytesdata) # Test the basic MIMEText class
def test_no_start_boundary(self): eq = self.ndiffAssertEqual msg = self._msgobj('msg_31.txt') eq(msg.get_payload(), """\ --BOUNDARY Content-Type: text/plain message 1 --BOUNDARY Content-Type: text/plain message 2 --BOUNDARY-- """)
def test_parser(self): eq = self.assertEqual msg, text = self._msgobj('msg_06.txt') # Check some of the outer headers eq(msg.get_content_type(), 'message/rfc822') # Make sure the payload is a list of exactly one sub-Message, and that # that submessage has a type of text/plain payload = msg.get_payload() self.assertIsInstance(payload, list) eq(len(payload), 1) msg1 = payload[0] self.assertIsInstance(msg1, Message) eq(msg1.get_content_type(), 'text/plain') self.assertIsInstance(msg1.get_payload(), str) eq(msg1.get_payload(), '\n') # Test various other bits of the package's functionality
def _install_message(self, message): """Format a message and blindly write to self._file.""" from_line = None if isinstance(message, str) and message.startswith('From '): newline = message.find('\n') if newline != -1: from_line = message[:newline] message = message[newline + 1:] else: from_line = message message = '' elif isinstance(message, _mboxMMDFMessage): from_line = 'From ' + message.get_from() elif isinstance(message, email.message.Message): from_line = message.get_unixfrom() # May be None. if from_line is None: from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime()) start = self._file.tell() self._file.write(from_line + os.linesep) self._dump_message(message, self._file, self._mangle_from_) stop = self._file.tell() return (start, stop)
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" path = os.path.join(self._path, str(key)) try: f = open(path, 'rb+') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: os.close(os.open(path, os.O_WRONLY | os.O_TRUNC)) self._dump_message(message, f) if isinstance(message, MHMessage): self._dump_sequences(message, key) finally: if self._locked: _unlock_file(f) finally: _sync_close(f)
def __init__(self, info): # info is either an email.message or # an httplib.HTTPResponse object. if isinstance(info, http.client.HTTPResponse): for key, value in info.getheaders(): key = key.lower() prev = self.get(key) if prev is not None: value = ', '.join((prev, value)) self[key] = value self.status = info.status self['status'] = str(self.status) self.reason = info.reason self.version = info.version elif isinstance(info, email.message.Message): for key, value in list(info.items()): self[key.lower()] = value self.status = int(self['status']) else: for key, value in info.items(): self[key.lower()] = value self.status = int(self.get('status', self.status))
def get_last_message(self, tablename): last_message = None # request mailbox list to the server if needed. if not isinstance(self.connection.mailbox_names, dict): self.get_mailboxes() try: result = self.connection.select( self.connection.mailbox_names[tablename]) last_message = int(result[1][0]) # Last message must be a positive integer if last_message == 0: last_message = 1 except (IndexError, ValueError, TypeError, KeyError): e = sys.exc_info()[1] self.db.logger.debug("Error retrieving the last mailbox" + " sequence number. %s" % str(e)) return last_message
def EQ(self,first,second): name = self.search_fields[first.name] result = None if name is not None: if name == "MESSAGE": # query by message sequence number result = "%s" % self.expand(second) elif name == "UID": result = "UID %s" % self.expand(second) elif name == "DATE": result = "ON %s" % self.convert_date(second) elif name in self.flags.values(): if second: result = "%s" % (name.upper()[1:]) else: result = "NOT %s" % (name.upper()[1:]) else: raise Exception("Operation not supported") else: raise Exception("Operation not supported") return result
def get_one_charset_payload(msg): """ Returns one charset and payload from a mail, multipart or not. Takes the first charset that is defined in a mutlipart message, and returns this part's payload Charset can be None in some weird case that should be dismissed. Parameters ---------- msg: email.message Returns ------- string: charset of mail string: payload """ if msg.is_multipart(): for part in msg.walk(): if part.get_content_charset() is not None: return part.get_content_charset(), part.get_payload(decode=True) else: if msg.get_content_charset() is None: raise ValueError('Charset is None, message too weird.') return msg.get_content_charset(), msg.get_payload(decode=True)
def add(self, message): """Add message and return assigned key.""" raise NotImplementedError('Method must be implemented by subclass')
def remove(self, key): """Remove the keyed message; raise KeyError if it doesn't exist.""" raise NotImplementedError('Method must be implemented by subclass')
def discard(self, key): """If the keyed message exists, remove it.""" try: self.remove(key) except KeyError: pass
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" raise NotImplementedError('Method must be implemented by subclass')
def get(self, key, default=None): """Return the keyed message, or default if it doesn't exist.""" try: return self.__getitem__(key) except KeyError: return default
def iteritems(self): """Return an iterator over (key, message) tuples.""" for key in self.iterkeys(): try: value = self[key] except KeyError: continue yield (key, value)
def items(self): """Return a list of (key, message) tuples. Memory intensive.""" return list(self.iteritems())
def has_key(self, key): """Return True if the keyed message exists, False otherwise.""" raise NotImplementedError('Method must be implemented by subclass')
def pop(self, key, default=None): """Delete the keyed message and return it, or default.""" try: result = self[key] except KeyError: return default self.discard(key) return result