我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用email.message_from_string()。
def test_split_long_continuation(self): eq = self.ndiffAssertEqual msg = email.message_from_string("""\ Subject: bug demonstration \t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789 \tmore text test """) sfp = StringIO() g = Generator(sfp) g.flatten(msg) eq(sfp.getvalue(), """\ Subject: bug demonstration \t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789 \tmore text test """)
def test_boundary_with_leading_space(self): eq = self.assertEqual msg = email.message_from_string('''\ MIME-Version: 1.0 Content-Type: multipart/mixed; boundary=" XXXX" -- XXXX Content-Type: text/plain -- XXXX Content-Type: text/plain -- XXXX-- ''') self.failUnless(msg.is_multipart()) eq(msg.get_boundary(), ' XXXX') eq(len(msg.get_payload()), 2)
def test_message_from_string_with_class(self): unless = self.failUnless fp = openfile('msg_01.txt') try: text = fp.read() finally: fp.close() # Create a subclass class MyMessage(Message): pass msg = email.message_from_string(text, MyMessage) unless(isinstance(msg, MyMessage)) # Try something more complicated fp = openfile('msg_02.txt') try: text = fp.read() finally: fp.close() msg = email.message_from_string(text, MyMessage) for subpart in msg.walk(): unless(isinstance(subpart, MyMessage))
def test__all__(self): module = __import__('email') # Can't use sorted() here due to Python 2.3 compatibility all = module.__all__[:] all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def test_get_param(self): eq = self.assertEqual msg = email.message_from_string( "X-Header: foo=one; bar=two; baz=three\n") eq(msg.get_param('bar', header='x-header'), 'two') eq(msg.get_param('quuz', header='x-header'), None) eq(msg.get_param('quuz'), None) msg = email.message_from_string( 'X-Header: foo; bar="one"; baz=two\n') eq(msg.get_param('foo', header='x-header'), '') eq(msg.get_param('bar', header='x-header'), 'one') eq(msg.get_param('baz', header='x-header'), 'two') # XXX: We are not RFC-2045 compliant! We cannot parse: # msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"' # msg.get_param("weird") # yet.
def check_email(self, uid, email_data, current_time): mail = email.message_from_string(email_data[0][1]) mail_time = email.utils.mktime_tz(email.utils.parsedate_tz(mail["Date"])) if current_time - mail_time < self.window * 60: if mail.get_content_maintype() == "multipart": for part in mail.walk(): if part.get_content_maintype() != 'multipart' and part.get('Content-Disposition') is not None: response = part.get_payload(decode=True) response = response.split(self.delimiter) if len(response) == 2: key = response[0].strip() value = response[1].strip() # If hook is not valid then also ignore if not self.execute_hook(key, value): self.logger.info("Adding failed hook with uid: {uid} to ignore.".format(uid=uid)) self.add_ignore(mail, uid) return # Clean_hook_dict will take care of this later self.logger.info("Email with uid: {uid} is expired, ignoring in next check".format(uid=uid)) # Add uid to ignore if uid is expired so it knows not to request it next cycle self.add_ignore(mail, uid)
def test_split_long_continuation(self): eq = self.ndiffAssertEqual msg = email.message_from_string("""\ Subject: bug demonstration \t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789 \tmore text test """) sfp = StringIO() g = Generator(sfp) g.flatten(msg) eq(sfp.getvalue(), """\ Subject: bug demonstration 12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789 more text test """)
def test_binary_body_with_encode_7or8bit(self): # Issue 17171. bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff' msg = MIMEApplication(bytesdata, _encoder=encoders.encode_7or8bit) # Treated as a string, this will be invalid code points. self.assertEqual(msg.get_payload(), bytesdata) self.assertEqual(msg.get_payload(decode=True), bytesdata) self.assertEqual(msg['Content-Transfer-Encoding'], '8bit') s = StringIO() g = Generator(s) g.flatten(msg) wireform = s.getvalue() msg2 = email.message_from_string(wireform) self.assertEqual(msg.get_payload(), bytesdata) self.assertEqual(msg2.get_payload(decode=True), bytesdata) self.assertEqual(msg2['Content-Transfer-Encoding'], '8bit')
def test_boundary_with_leading_space(self): eq = self.assertEqual msg = email.message_from_string('''\ MIME-Version: 1.0 Content-Type: multipart/mixed; boundary=" XXXX" -- XXXX Content-Type: text/plain -- XXXX Content-Type: text/plain -- XXXX-- ''') self.assertTrue(msg.is_multipart()) eq(msg.get_boundary(), ' XXXX') eq(len(msg.get_payload()), 2)
def test_message_from_string_with_class(self): fp = openfile('msg_01.txt') try: text = fp.read() finally: fp.close() # Create a subclass class MyMessage(Message): pass msg = email.message_from_string(text, MyMessage) self.assertIsInstance(msg, MyMessage) # Try something more complicated fp = openfile('msg_02.txt') try: text = fp.read() finally: fp.close() msg = email.message_from_string(text, MyMessage) for subpart in msg.walk(): self.assertIsInstance(subpart, MyMessage)
def test__all__(self): module = __import__('email') all = module.__all__ all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def test_rfc2231_unencoded_then_encoded_segments(self): eq = self.assertEqual m = """\ Content-Type: application/x-foo; \tname*0=\"us-ascii'en-us'My\"; \tname*1*=\" Document\"; \tname*2*=\" For You\" """ msg = email.message_from_string(m) charset, language, s = msg.get_param('name') eq(charset, 'us-ascii') eq(language, 'en-us') eq(s, 'My Document For You') # Tests to ensure that signed parts of an email are completely preserved, as # required by RFC1847 section 2.1. Note that these are incomplete, because the # email package does not currently always preserve the body. See issue 1670765.
def parse(self, lines): """ Parse a metadata section from a Lines object """ # Parse raw lines self._lineno = lines.lineno # Get everything until we reach an empty line while True: line = lines.next() # Stop at an empty line or at EOF if not line: break self._lines.append(line) # Parse fields in the same way as email headers import email for k, v in email.message_from_string("\n".join(self._lines)).items(): self._raw[k.lower()] = v.strip() # Extract well known values # Tags f = self._raw.get("tags", None) if f is not None: self.tags.update(re.split("[ ,\t]+", f))
def __init__(self, message=None): """Initialize a Message instance.""" if isinstance(message, email.message.Message): self._become_message(copy.deepcopy(message)) if isinstance(message, Message): message._explain_to(self) elif isinstance(message, str): self._become_message(email.message_from_string(message)) elif hasattr(message, "read"): self._become_message(email.message_from_file(message)) elif message is None: email.message.Message.__init__(self) else: raise TypeError('Invalid message type: %s' % type(message))
def test_bad_param(self): msg = email.message_from_string("Content-Type: blarg; baz; boo\n") self.assertEqual(msg.get_param('baz'), '')
def test_missing_filename(self): msg = email.message_from_string("From: foo\n") self.assertEqual(msg.get_filename(), None)
def test_bogus_filename(self): msg = email.message_from_string( "Content-Disposition: blarg; filename\n") self.assertEqual(msg.get_filename(), '')
def test_missing_boundary(self): msg = email.message_from_string("From: foo\n") self.assertEqual(msg.get_boundary(), None)
def test_get_params(self): eq = self.assertEqual msg = email.message_from_string( 'X-Header: foo=one; bar=two; baz=three\n') eq(msg.get_params(header='x-header'), [('foo', 'one'), ('bar', 'two'), ('baz', 'three')]) msg = email.message_from_string( 'X-Header: foo; bar=one; baz=two\n') eq(msg.get_params(header='x-header'), [('foo', ''), ('bar', 'one'), ('baz', 'two')]) eq(msg.get_params(), None) msg = email.message_from_string( 'X-Header: foo; bar="one"; baz=two\n') eq(msg.get_params(header='x-header'), [('foo', ''), ('bar', 'one'), ('baz', 'two')])
def test_get_param_with_semis_in_quotes(self): msg = email.message_from_string( 'Content-Type: image/pjpeg; name="Jim&&Jill"\n') self.assertEqual(msg.get_param('name'), 'Jim&&Jill') self.assertEqual(msg.get_param('name', unquote=False), '"Jim&&Jill"')
def test_has_key(self): msg = email.message_from_string('Header: exists') self.failUnless(msg.has_key('header')) self.failUnless(msg.has_key('Header')) self.failUnless(msg.has_key('HEADER')) self.failIf(msg.has_key('headeri'))
def test_another_long_multiline_header(self): eq = self.ndiffAssertEqual m = '''\ Received: from siimage.com ([172.25.1.3]) by zima.siliconimage.com with Microsoft SMTPSVC(5.0.2195.4905); \tWed, 16 Oct 2002 07:41:11 -0700''' msg = email.message_from_string(m) eq(msg.as_string(), '''\ Received: from siimage.com ([172.25.1.3]) by zima.siliconimage.com with \tMicrosoft SMTPSVC(5.0.2195.4905); Wed, 16 Oct 2002 07:41:11 -0700 ''')
def test_message_from_string(self): fp = openfile('msg_01.txt') try: text = fp.read() finally: fp.close() msg = email.message_from_string(text) s = StringIO() # Don't wrap/continue long headers since we're trying to test # idempotency. g = Generator(s, maxheaderlen=0) g.flatten(msg) self.assertEqual(text, s.getvalue())
def test_strip_line_feed_and_carriage_return_in_headers(self): eq = self.assertEqual # For [ 1002475 ] email message parser doesn't handle \r\n correctly value1 = 'text' value2 = 'more text' m = 'Header: %s\r\nNext-Header: %s\r\n\r\nBody\r\n\r\n' % ( value1, value2) msg = email.message_from_string(m) eq(msg.get('Header'), value1) eq(msg.get('Next-Header'), value2)
def test_rfc2822_header_syntax(self): eq = self.assertEqual m = '>From: foo\nFrom: bar\n!"#QUX;~: zoo\n\nbody' msg = email.message_from_string(m) eq(len(msg.keys()), 3) keys = msg.keys() keys.sort() eq(keys, ['!"#QUX;~', '>From', 'From']) eq(msg.get_payload(), 'body')
def test_rfc2822_one_character_header(self): eq = self.assertEqual m = 'A: first header\nB: second header\nCC: third header\n\nbody' msg = email.message_from_string(m) headers = msg.keys() headers.sort() eq(headers, ['A', 'B', 'CC']) eq(msg.get_payload(), 'body')