我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用email.parser.Parser()。
def wheel_version(source_dir): """ Return the Wheel-Version of an extracted wheel, if possible. Otherwise, return False if we couldn't parse / extract it. """ try: dist = [d for d in pkg_resources.find_on_path(None, source_dir)][0] wheel_data = dist.get_metadata('WHEEL') wheel_data = Parser().parsestr(wheel_data) version = wheel_data['Wheel-Version'].strip() version = tuple(map(int, version.split('.'))) return version except: return False
def test_boundary_without_trailing_newline(self): m = Parser().parsestr("""\ Content-Type: multipart/mixed; boundary="===============0012394164==" MIME-Version: 1.0 --===============0012394164== Content-Type: image/file1.jpg MIME-Version: 1.0 Content-Transfer-Encoding: base64 YXNkZg== --===============0012394164==--""") self.assertEquals(m.get_payload(0).get_payload(), 'YXNkZg==') # Test some badly formatted messages
def extractEmailBody(filename): with open(filename, 'r') as fin: mail = fin.readlines() mail = ''.join(mail) msg = Parser().parsestr(mail) metadata = { 'To': msg['to'], 'From': msg['from'], 'Cc': msg['cc'], 'Bcc': msg['bcc'], 'Subject': msg['subject'], 'Date': msg['date'], 'References': msg['references'] } return msg.get_payload(), metadata # Removes everything after 'Original Message' or '<somebody> wrote:' # TODO: Fix this for replies AFTER original messages
def test_boundary_without_trailing_newline(self): m = Parser().parsestr("""\ Content-Type: multipart/mixed; boundary="===============0012394164==" MIME-Version: 1.0 --===============0012394164== Content-Type: image/file1.jpg MIME-Version: 1.0 Content-Transfer-Encoding: base64 YXNkZg== --===============0012394164==--""") self.assertEqual(m.get_payload(0).get_payload(), 'YXNkZg==') # Test some badly formatted messages
def test_pkginfo_mangle_from(tmpdir): """Test that write_pkginfo() will not prepend a ">" to a line starting with "From".""" metadata = """\ Metadata-Version: 2.0 Name: foo From blahblah ==== Test ==== """ message = Parser().parsestr(metadata) pkginfo_file = tmpdir.join('PKGINFO') write_pkg_info(str(pkginfo_file), message) assert pkginfo_file.read_text('ascii') == metadata
def email_analyse(inputfile, to_email_list, from_email_list, email_body): with open(inputfile, "r") as f: data = f.read() email = Parser().parsestr(data) if email['to']: email_to = email['to'] email_to = email_to.replace("\n", "") email_to = email_to.replace("\t", "") email_to = email_to.replace(" ", "") email_to = email_to.split(",") for email_to_1 in email_to: to_email_list.append(email_to_1) from_email_list.append(email['from']) email_body.append(email.get_payload())
def email_analyse(inputfile, to_email_list, from_email_list, email_body): with open(inputfile, "r") as f: data = f.read() email = Parser().parsestr(data) if email['to']: email_to = email['to'] email_to = email_to.replace("\n", "") email_to = email_to.replace("\t", "") email_to = email_to.replace(" ", "") email_to = email_to.split(",") for email_to_1 in email_to: to_email_list.append(email_to_1) from_email_list.append(email['from'])
def test__all__(self): module = __import__('email') # Can't use sorted() here due to Python 2.3 compatibility all = module.__all__[:] all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def testExtractOpenPGPHeader(self): """ Test the OpenPGP header key extraction """ KEYURL = "https://leap.se/key.txt" OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) message = Parser().parsestr(self.EMAIL) message.add_header("OpenPGP", OpenPGP) self.fetcher._keymanager.fetch_key = Mock( return_value=defer.succeed(None)) def fetch_key_called(ret): self.fetcher._keymanager.fetch_key.assert_called_once_with( ADDRESS_2, KEYURL) d = self._create_incoming_email(message.as_string()) d.addCallback( lambda email: self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) d.addCallback(lambda _: self.fetcher.fetch()) d.addCallback(fetch_key_called) return d
def testExtractOpenPGPHeaderInvalidUrl(self): """ Test the OpenPGP header key extraction """ KEYURL = "https://someotherdomain.com/key.txt" OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) message = Parser().parsestr(self.EMAIL) message.add_header("OpenPGP", OpenPGP) self.fetcher._keymanager.fetch_key = Mock() def fetch_key_called(ret): self.assertFalse(self.fetcher._keymanager.fetch_key.called) d = self._create_incoming_email(message.as_string()) d.addCallback( lambda email: self._mock_soledad_get_from_index(fields.JUST_MAIL_IDX, [email])) d.addCallback(lambda _: self.fetcher.fetch()) d.addCallback(fetch_key_called) return d
def testStripLeapHeaders(self): ENC_HEADER = "fake encryption header" SIG_HEADER = "fake signature header" message = Parser().parsestr(self.EMAIL) message.add_header("X-Leap-Encryption", ENC_HEADER) message.add_header("X-Leap-Signature", SIG_HEADER) self.fetcher._add_message_locally = Mock() def check_headers(_): self.assertTrue(self.fetcher._add_message_locally.called, "The message was not added to soledad") _, data = self.fetcher._add_message_locally.call_args[0][0] msg = Parser().parsestr(data) self.assertNotEqual(msg.get('X-Leap-Encryption', ''), ENC_HEADER) self.assertNotEqual(msg.get('X-Leap-Signature', ''), SIG_HEADER) d = self._do_fetch(message.as_string()) d.addCallback(check_headers) return d