我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用email.message.get_payload()。
def _get_upload_content(field_storage): """Returns an email.Message holding the values of the file transfer. It decodes the content of the field storage and creates a new email.Message. Args: field_storage: cgi.FieldStorage that represents uploaded blob. Returns: An email.message.Message holding the upload information. """ message = email.message.Message() message.add_header( 'content-transfer-encoding', field_storage.headers.getheader('Content-Transfer-Encoding', '')) message.set_payload(field_storage.file.read()) payload = message.get_payload(decode=True) return email.message_from_string(payload)
def test_set_MM(self): # Set with a MaildirMessage instance msg0 = mailbox.MaildirMessage(self._template % 0) msg0.set_flags('TP') key = self._box.add(msg0) msg_returned = self._box.get_message(key) self.assertEqual(msg_returned.get_subdir(), 'new') self.assertEqual(msg_returned.get_flags(), 'PT') msg1 = mailbox.MaildirMessage(self._template % 1) self._box[key] = msg1 msg_returned = self._box.get_message(key) self.assertEqual(msg_returned.get_subdir(), 'new') self.assertEqual(msg_returned.get_flags(), '') self.assertEqual(msg_returned.get_payload(), '1\n') msg2 = mailbox.MaildirMessage(self._template % 2) msg2.set_info('2,S') self._box[key] = msg2 self._box[key] = self._template % 3 msg_returned = self._box.get_message(key) self.assertEqual(msg_returned.get_subdir(), 'new') self.assertEqual(msg_returned.get_flags(), 'S') self.assertEqual(msg_returned.get_payload(), '3\n')
def test_pop(self): # Get and remove a message using pop() key0 = self._box.add(self._template % 0) self.assertIn(key0, self._box) key1 = self._box.add(self._template % 1) self.assertIn(key1, self._box) self.assertEqual(self._box.pop(key0).get_payload(), '0\n') self.assertNotIn(key0, self._box) self.assertIn(key1, self._box) key2 = self._box.add(self._template % 2) self.assertIn(key2, self._box) self.assertEqual(self._box.pop(key2).get_payload(), '2\n') self.assertNotIn(key2, self._box) self.assertIn(key1, self._box) self.assertEqual(self._box.pop(key1).get_payload(), '1\n') self.assertNotIn(key1, self._box) self.assertEqual(len(self._box), 0)
def test_visible(self): # Get, set, and update visible headers msg = mailbox.BabylMessage(_sample_message) visible = msg.get_visible() self.assertEqual(visible.keys(), []) self.assertIs(visible.get_payload(), None) visible['User-Agent'] = 'FooBar 1.0' visible['X-Whatever'] = 'Blah' self.assertEqual(msg.get_visible().keys(), []) msg.set_visible(visible) visible = msg.get_visible() self.assertTrue(visible.keys() == ['User-Agent', 'X-Whatever']) self.assertTrue(visible['User-Agent'] == 'FooBar 1.0') self.assertEqual(visible['X-Whatever'], 'Blah') self.assertIs(visible.get_payload(), None) msg.update_visible() self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) self.assertIs(visible.get_payload(), None) visible = msg.get_visible() self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 'Subject']) for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): self.assertEqual(visible[header], msg[header])
def test_visible(self): # Get, set, and update visible headers msg = mailbox.BabylMessage(_sample_message) visible = msg.get_visible() self.assertEqual(visible.keys(), []) self.assertIsNone(visible.get_payload()) visible['User-Agent'] = 'FooBar 1.0' visible['X-Whatever'] = 'Blah' self.assertEqual(msg.get_visible().keys(), []) msg.set_visible(visible) visible = msg.get_visible() self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) self.assertEqual(visible['User-Agent'], 'FooBar 1.0') self.assertEqual(visible['X-Whatever'], 'Blah') self.assertIsNone(visible.get_payload()) msg.update_visible() self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) self.assertIsNone(visible.get_payload()) visible = msg.get_visible() self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 'Subject']) for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): self.assertEqual(visible[header], msg[header])
def test_generate_email(self): message = mail_request_handler.MailRequestHandler._generate_email( 'to', 'from', 'cc', 'subject', 'body') self.assertEqual('from', message['From']) self.assertEqual('to', message['To']) self.assertEqual('cc', message['Cc']) self.assertEqual('subject', message['Subject']) text, html = message.get_payload() self.assertEqual('text/plain', text.get_content_type()) self.assertEqual('utf-8', text.get_content_charset()) content = text.get_payload() if text['content-transfer-encoding'] != '7bit': content = content.decode(text['content-transfer-encoding']) self.assertEqual('body', content) self.assertEqual('text/html', html.get_content_type()) self.assertEqual('utf-8', html.get_content_charset()) content = html.get_payload() if html['content-transfer-encoding'] != '7bit': content = content.decode(html['content-transfer-encoding']) self.assertEqual('body', content)
def _check_sample(self, msg): # Inspect a mailbox.Message representation of the sample message self.assertIsInstance(msg, email.message.Message) self.assertIsInstance(msg, mailbox.Message) for key, value in _sample_headers.iteritems(): self.assertIn(value, msg.get_all(key)) self.assertTrue(msg.is_multipart()) self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) for i, payload in enumerate(_sample_payloads): part = msg.get_payload(i) self.assertIsInstance(part, email.message.Message) self.assertNotIsInstance(part, mailbox.Message) self.assertEqual(part.get_payload(), payload)
def test_get(self): # Retrieve messages using get() key0 = self._box.add(self._template % 0) msg = self._box.get(key0) self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '0\n') self.assertIsNone(self._box.get('foo')) self.assertFalse(self._box.get('foo', False)) self._box.close() self._box = self._factory(self._path, factory=rfc822.Message) key1 = self._box.add(self._template % 1) msg = self._box.get(key1) self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.fp.read(), '1' + os.linesep) msg.fp.close()
def test_getitem(self): # Retrieve message using __getitem__() key0 = self._box.add(self._template % 0) msg = self._box[key0] self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '0\n') self.assertRaises(KeyError, lambda: self._box['foo']) self._box.discard(key0) self.assertRaises(KeyError, lambda: self._box[key0])
def test_get_message(self): # Get Message representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) msg0 = self._box.get_message(key0) self.assertIsInstance(msg0, mailbox.Message) self.assertEqual(msg0['from'], 'foo') self.assertEqual(msg0.get_payload(), '0\n') self._check_sample(self._box.get_message(key1))
def _check_iteration(self, method, do_keys, do_values, repetitions=10): for value in method(): self.fail("Not empty") keys, values = [], [] for i in xrange(repetitions): keys.append(self._box.add(self._template % i)) values.append(self._template % i) if do_keys and not do_values: returned_keys = list(method()) elif do_values and not do_keys: returned_values = list(method()) else: returned_keys, returned_values = [], [] for key, value in method(): returned_keys.append(key) returned_values.append(value) if do_keys: self.assertEqual(len(keys), len(returned_keys)) self.assertEqual(set(keys), set(returned_keys)) if do_values: count = 0 for value in returned_values: self.assertEqual(value['from'], 'foo') self.assertLess(int(value.get_payload()), repetitions) count += 1 self.assertEqual(len(values), count)
def test_popitem(self, iterations=10): # Get and remove an arbitrary (key, message) using popitem() keys = [] for i in xrange(10): keys.append(self._box.add(self._template % i)) seen = [] for i in xrange(10): key, msg = self._box.popitem() self.assertIn(key, keys) self.assertNotIn(key, seen) seen.append(key) self.assertEqual(int(msg.get_payload()), keys.index(key)) self.assertEqual(len(self._box), 0) for key in keys: self.assertRaises(KeyError, lambda: self._box[key])
def test_initialize_with_nothing(self): # Initialize without arguments msg = self._factory() self._post_initialize_hook(msg) self.assertIsInstance(msg, email.message.Message) self.assertIsInstance(msg, mailbox.Message) self.assertIsInstance(msg, self._factory) self.assertEqual(msg.keys(), []) self.assertFalse(msg.is_multipart()) self.assertIsNone(msg.get_payload())
def _check_sample(self, msg): # Inspect a mailbox.Message representation of the sample message self.assertIsInstance(msg, email.message.Message) self.assertIsInstance(msg, mailbox.Message) for key, value in _sample_headers.items(): self.assertIn(value, msg.get_all(key)) self.assertTrue(msg.is_multipart()) self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) for i, payload in enumerate(_sample_payloads): part = msg.get_payload(i) self.assertIsInstance(part, email.message.Message) self.assertNotIsInstance(part, mailbox.Message) self.assertEqual(part.get_payload(), payload)
def test_add_8bit_body(self): key = self._box.add(self._non_latin_bin_msg) self.assertEqual(self._box.get_bytes(key), self._non_latin_bin_msg) with self._box.get_file(key) as f: self.assertEqual(f.read(), self._non_latin_bin_msg.replace(b'\n', os.linesep.encode())) self.assertEqual(self._box[key].get_payload(), "??, ??? ?????.\n")
def test_get(self): # Retrieve messages using get() key0 = self._box.add(self._template % 0) msg = self._box.get(key0) self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '0\n') self.assertIs(self._box.get('foo'), None) self.assertIs(self._box.get('foo', False), False) self._box.close() self._box = self._factory(self._path) key1 = self._box.add(self._template % 1) msg = self._box.get(key1) self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '1\n')
def test_popitem(self, iterations=10): # Get and remove an arbitrary (key, message) using popitem() keys = [] for i in range(10): keys.append(self._box.add(self._template % i)) seen = [] for i in range(10): key, msg = self._box.popitem() self.assertIn(key, keys) self.assertNotIn(key, seen) seen.append(key) self.assertEqual(int(msg.get_payload()), keys.index(key)) self.assertEqual(len(self._box), 0) for key in keys: self.assertRaises(KeyError, lambda: self._box[key])
def test_initialize_with_nothing(self): # Initialize without arguments msg = self._factory() self._post_initialize_hook(msg) self.assertIsInstance(msg, email.message.Message) self.assertIsInstance(msg, mailbox.Message) self.assertIsInstance(msg, self._factory) self.assertEqual(msg.keys(), []) self.assertFalse(msg.is_multipart()) self.assertEqual(msg.get_payload(), None)