Python email.message 模块,get_payload() 实例源码

我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用email.message.get_payload()

项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_set_MM(self):
        # Set with a MaildirMessage instance
        msg0 = mailbox.MaildirMessage(self._template % 0)
        msg0.set_flags('TP')
        key = self._box.add(msg0)
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'PT')
        msg1 = mailbox.MaildirMessage(self._template % 1)
        self._box[key] = msg1
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), '')
        self.assertEqual(msg_returned.get_payload(), '1\n')
        msg2 = mailbox.MaildirMessage(self._template % 2)
        msg2.set_info('2,S')
        self._box[key] = msg2
        self._box[key] = self._template % 3
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'S')
        self.assertEqual(msg_returned.get_payload(), '3\n')
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_set_MM(self):
        # Set with a MaildirMessage instance
        msg0 = mailbox.MaildirMessage(self._template % 0)
        msg0.set_flags('TP')
        key = self._box.add(msg0)
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'PT')
        msg1 = mailbox.MaildirMessage(self._template % 1)
        self._box[key] = msg1
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), '')
        self.assertEqual(msg_returned.get_payload(), '1\n')
        msg2 = mailbox.MaildirMessage(self._template % 2)
        msg2.set_info('2,S')
        self._box[key] = msg2
        self._box[key] = self._template % 3
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'S')
        self.assertEqual(msg_returned.get_payload(), '3\n')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_pop(self):
        # Get and remove a message using pop()
        key0 = self._box.add(self._template % 0)
        self.assertIn(key0, self._box)
        key1 = self._box.add(self._template % 1)
        self.assertIn(key1, self._box)
        self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
        self.assertNotIn(key0, self._box)
        self.assertIn(key1, self._box)
        key2 = self._box.add(self._template % 2)
        self.assertIn(key2, self._box)
        self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
        self.assertNotIn(key2, self._box)
        self.assertIn(key1, self._box)
        self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
        self.assertNotIn(key1, self._box)
        self.assertEqual(len(self._box), 0)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_set_MM(self):
        # Set with a MaildirMessage instance
        msg0 = mailbox.MaildirMessage(self._template % 0)
        msg0.set_flags('TP')
        key = self._box.add(msg0)
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'PT')
        msg1 = mailbox.MaildirMessage(self._template % 1)
        self._box[key] = msg1
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), '')
        self.assertEqual(msg_returned.get_payload(), '1\n')
        msg2 = mailbox.MaildirMessage(self._template % 2)
        msg2.set_info('2,S')
        self._box[key] = msg2
        self._box[key] = self._template % 3
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'S')
        self.assertEqual(msg_returned.get_payload(), '3\n')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_visible(self):
        # Get, set, and update visible headers
        msg = mailbox.BabylMessage(_sample_message)
        visible = msg.get_visible()
        self.assertEqual(visible.keys(), [])
        self.assertIs(visible.get_payload(), None)
        visible['User-Agent'] = 'FooBar 1.0'
        visible['X-Whatever'] = 'Blah'
        self.assertEqual(msg.get_visible().keys(), [])
        msg.set_visible(visible)
        visible = msg.get_visible()
        self.assertTrue(visible.keys() == ['User-Agent', 'X-Whatever'])
        self.assertTrue(visible['User-Agent'] == 'FooBar 1.0')
        self.assertEqual(visible['X-Whatever'], 'Blah')
        self.assertIs(visible.get_payload(), None)
        msg.update_visible()
        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
        self.assertIs(visible.get_payload(), None)
        visible = msg.get_visible()
        self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
                                          'Subject'])
        for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
            self.assertEqual(visible[header], msg[header])
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_set_MM(self):
        # Set with a MaildirMessage instance
        msg0 = mailbox.MaildirMessage(self._template % 0)
        msg0.set_flags('TP')
        key = self._box.add(msg0)
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'PT')
        msg1 = mailbox.MaildirMessage(self._template % 1)
        self._box[key] = msg1
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), '')
        self.assertEqual(msg_returned.get_payload(), '1\n')
        msg2 = mailbox.MaildirMessage(self._template % 2)
        msg2.set_info('2,S')
        self._box[key] = msg2
        self._box[key] = self._template % 3
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'S')
        self.assertEqual(msg_returned.get_payload(), '3\n')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pop(self):
        # Get and remove a message using pop()
        key0 = self._box.add(self._template % 0)
        self.assertIn(key0, self._box)
        key1 = self._box.add(self._template % 1)
        self.assertIn(key1, self._box)
        self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
        self.assertNotIn(key0, self._box)
        self.assertIn(key1, self._box)
        key2 = self._box.add(self._template % 2)
        self.assertIn(key2, self._box)
        self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
        self.assertNotIn(key2, self._box)
        self.assertIn(key1, self._box)
        self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
        self.assertNotIn(key1, self._box)
        self.assertEqual(len(self._box), 0)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_set_MM(self):
        # Set with a MaildirMessage instance
        msg0 = mailbox.MaildirMessage(self._template % 0)
        msg0.set_flags('TP')
        key = self._box.add(msg0)
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'PT')
        msg1 = mailbox.MaildirMessage(self._template % 1)
        self._box[key] = msg1
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), '')
        self.assertEqual(msg_returned.get_payload(), '1\n')
        msg2 = mailbox.MaildirMessage(self._template % 2)
        msg2.set_info('2,S')
        self._box[key] = msg2
        self._box[key] = self._template % 3
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'S')
        self.assertEqual(msg_returned.get_payload(), '3\n')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_visible(self):
        # Get, set, and update visible headers
        msg = mailbox.BabylMessage(_sample_message)
        visible = msg.get_visible()
        self.assertEqual(visible.keys(), [])
        self.assertIsNone(visible.get_payload())
        visible['User-Agent'] = 'FooBar 1.0'
        visible['X-Whatever'] = 'Blah'
        self.assertEqual(msg.get_visible().keys(), [])
        msg.set_visible(visible)
        visible = msg.get_visible()
        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
        self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
        self.assertEqual(visible['X-Whatever'], 'Blah')
        self.assertIsNone(visible.get_payload())
        msg.update_visible()
        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
        self.assertIsNone(visible.get_payload())
        visible = msg.get_visible()
        self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
                                          'Subject'])
        for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
            self.assertEqual(visible[header], msg[header])
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_set_MM(self):
        # Set with a MaildirMessage instance
        msg0 = mailbox.MaildirMessage(self._template % 0)
        msg0.set_flags('TP')
        key = self._box.add(msg0)
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'PT')
        msg1 = mailbox.MaildirMessage(self._template % 1)
        self._box[key] = msg1
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), '')
        self.assertEqual(msg_returned.get_payload(), '1\n')
        msg2 = mailbox.MaildirMessage(self._template % 2)
        msg2.set_info('2,S')
        self._box[key] = msg2
        self._box[key] = self._template % 3
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'S')
        self.assertEqual(msg_returned.get_payload(), '3\n')
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def test_generate_email(self):
    message = mail_request_handler.MailRequestHandler._generate_email(
        'to', 'from', 'cc', 'subject', 'body')
    self.assertEqual('from', message['From'])
    self.assertEqual('to', message['To'])
    self.assertEqual('cc', message['Cc'])
    self.assertEqual('subject', message['Subject'])
    text, html = message.get_payload()
    self.assertEqual('text/plain', text.get_content_type())
    self.assertEqual('utf-8', text.get_content_charset())
    content = text.get_payload()
    if text['content-transfer-encoding'] != '7bit':
      content = content.decode(text['content-transfer-encoding'])
    self.assertEqual('body', content)

    self.assertEqual('text/html', html.get_content_type())
    self.assertEqual('utf-8', html.get_content_charset())
    content = html.get_payload()
    if html['content-transfer-encoding'] != '7bit':
      content = content.decode(html['content-transfer-encoding'])
    self.assertEqual('body', content)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_pop(self):
        # Get and remove a message using pop()
        key0 = self._box.add(self._template % 0)
        self.assertIn(key0, self._box)
        key1 = self._box.add(self._template % 1)
        self.assertIn(key1, self._box)
        self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
        self.assertNotIn(key0, self._box)
        self.assertIn(key1, self._box)
        key2 = self._box.add(self._template % 2)
        self.assertIn(key2, self._box)
        self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
        self.assertNotIn(key2, self._box)
        self.assertIn(key1, self._box)
        self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
        self.assertNotIn(key1, self._box)
        self.assertEqual(len(self._box), 0)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_set_MM(self):
        # Set with a MaildirMessage instance
        msg0 = mailbox.MaildirMessage(self._template % 0)
        msg0.set_flags('TP')
        key = self._box.add(msg0)
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'PT')
        msg1 = mailbox.MaildirMessage(self._template % 1)
        self._box[key] = msg1
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), '')
        self.assertEqual(msg_returned.get_payload(), '1\n')
        msg2 = mailbox.MaildirMessage(self._template % 2)
        msg2.set_info('2,S')
        self._box[key] = msg2
        self._box[key] = self._template % 3
        msg_returned = self._box.get_message(key)
        self.assertEqual(msg_returned.get_subdir(), 'new')
        self.assertEqual(msg_returned.get_flags(), 'S')
        self.assertEqual(msg_returned.get_payload(), '3\n')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_visible(self):
        # Get, set, and update visible headers
        msg = mailbox.BabylMessage(_sample_message)
        visible = msg.get_visible()
        self.assertEqual(visible.keys(), [])
        self.assertIsNone(visible.get_payload())
        visible['User-Agent'] = 'FooBar 1.0'
        visible['X-Whatever'] = 'Blah'
        self.assertEqual(msg.get_visible().keys(), [])
        msg.set_visible(visible)
        visible = msg.get_visible()
        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
        self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
        self.assertEqual(visible['X-Whatever'], 'Blah')
        self.assertIsNone(visible.get_payload())
        msg.update_visible()
        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
        self.assertIsNone(visible.get_payload())
        visible = msg.get_visible()
        self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
                                          'Subject'])
        for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
            self.assertEqual(visible[header], msg[header])
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _check_sample(self, msg):
        # Inspect a mailbox.Message representation of the sample message
        self.assertIsInstance(msg, email.message.Message)
        self.assertIsInstance(msg, mailbox.Message)
        for key, value in _sample_headers.iteritems():
            self.assertIn(value, msg.get_all(key))
        self.assertTrue(msg.is_multipart())
        self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
        for i, payload in enumerate(_sample_payloads):
            part = msg.get_payload(i)
            self.assertIsInstance(part, email.message.Message)
            self.assertNotIsInstance(part, mailbox.Message)
            self.assertEqual(part.get_payload(), payload)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_get(self):
        # Retrieve messages using get()
        key0 = self._box.add(self._template % 0)
        msg = self._box.get(key0)
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.get_payload(), '0\n')
        self.assertIsNone(self._box.get('foo'))
        self.assertFalse(self._box.get('foo', False))
        self._box.close()
        self._box = self._factory(self._path, factory=rfc822.Message)
        key1 = self._box.add(self._template % 1)
        msg = self._box.get(key1)
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.fp.read(), '1' + os.linesep)
        msg.fp.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_getitem(self):
        # Retrieve message using __getitem__()
        key0 = self._box.add(self._template % 0)
        msg = self._box[key0]
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.get_payload(), '0\n')
        self.assertRaises(KeyError, lambda: self._box['foo'])
        self._box.discard(key0)
        self.assertRaises(KeyError, lambda: self._box[key0])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_get_message(self):
        # Get Message representations of messages
        key0 = self._box.add(self._template % 0)
        key1 = self._box.add(_sample_message)
        msg0 = self._box.get_message(key0)
        self.assertIsInstance(msg0, mailbox.Message)
        self.assertEqual(msg0['from'], 'foo')
        self.assertEqual(msg0.get_payload(), '0\n')
        self._check_sample(self._box.get_message(key1))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _check_iteration(self, method, do_keys, do_values, repetitions=10):
        for value in method():
            self.fail("Not empty")
        keys, values = [], []
        for i in xrange(repetitions):
            keys.append(self._box.add(self._template % i))
            values.append(self._template % i)
        if do_keys and not do_values:
            returned_keys = list(method())
        elif do_values and not do_keys:
            returned_values = list(method())
        else:
            returned_keys, returned_values = [], []
            for key, value in method():
                returned_keys.append(key)
                returned_values.append(value)
        if do_keys:
            self.assertEqual(len(keys), len(returned_keys))
            self.assertEqual(set(keys), set(returned_keys))
        if do_values:
            count = 0
            for value in returned_values:
                self.assertEqual(value['from'], 'foo')
                self.assertLess(int(value.get_payload()), repetitions)
                count += 1
            self.assertEqual(len(values), count)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_popitem(self, iterations=10):
        # Get and remove an arbitrary (key, message) using popitem()
        keys = []
        for i in xrange(10):
            keys.append(self._box.add(self._template % i))
        seen = []
        for i in xrange(10):
            key, msg = self._box.popitem()
            self.assertIn(key, keys)
            self.assertNotIn(key, seen)
            seen.append(key)
            self.assertEqual(int(msg.get_payload()), keys.index(key))
        self.assertEqual(len(self._box), 0)
        for key in keys:
            self.assertRaises(KeyError, lambda: self._box[key])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_initialize_with_nothing(self):
        # Initialize without arguments
        msg = self._factory()
        self._post_initialize_hook(msg)
        self.assertIsInstance(msg, email.message.Message)
        self.assertIsInstance(msg, mailbox.Message)
        self.assertIsInstance(msg, self._factory)
        self.assertEqual(msg.keys(), [])
        self.assertFalse(msg.is_multipart())
        self.assertIsNone(msg.get_payload())
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _check_sample(self, msg):
        # Inspect a mailbox.Message representation of the sample message
        self.assertIsInstance(msg, email.message.Message)
        self.assertIsInstance(msg, mailbox.Message)
        for key, value in _sample_headers.iteritems():
            self.assertIn(value, msg.get_all(key))
        self.assertTrue(msg.is_multipart())
        self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
        for i, payload in enumerate(_sample_payloads):
            part = msg.get_payload(i)
            self.assertIsInstance(part, email.message.Message)
            self.assertNotIsInstance(part, mailbox.Message)
            self.assertEqual(part.get_payload(), payload)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_get(self):
        # Retrieve messages using get()
        key0 = self._box.add(self._template % 0)
        msg = self._box.get(key0)
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.get_payload(), '0\n')
        self.assertIsNone(self._box.get('foo'))
        self.assertFalse(self._box.get('foo', False))
        self._box.close()
        self._box = self._factory(self._path, factory=rfc822.Message)
        key1 = self._box.add(self._template % 1)
        msg = self._box.get(key1)
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.fp.read(), '1' + os.linesep)
        msg.fp.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_getitem(self):
        # Retrieve message using __getitem__()
        key0 = self._box.add(self._template % 0)
        msg = self._box[key0]
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.get_payload(), '0\n')
        self.assertRaises(KeyError, lambda: self._box['foo'])
        self._box.discard(key0)
        self.assertRaises(KeyError, lambda: self._box[key0])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_get_message(self):
        # Get Message representations of messages
        key0 = self._box.add(self._template % 0)
        key1 = self._box.add(_sample_message)
        msg0 = self._box.get_message(key0)
        self.assertIsInstance(msg0, mailbox.Message)
        self.assertEqual(msg0['from'], 'foo')
        self.assertEqual(msg0.get_payload(), '0\n')
        self._check_sample(self._box.get_message(key1))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _check_iteration(self, method, do_keys, do_values, repetitions=10):
        for value in method():
            self.fail("Not empty")
        keys, values = [], []
        for i in xrange(repetitions):
            keys.append(self._box.add(self._template % i))
            values.append(self._template % i)
        if do_keys and not do_values:
            returned_keys = list(method())
        elif do_values and not do_keys:
            returned_values = list(method())
        else:
            returned_keys, returned_values = [], []
            for key, value in method():
                returned_keys.append(key)
                returned_values.append(value)
        if do_keys:
            self.assertEqual(len(keys), len(returned_keys))
            self.assertEqual(set(keys), set(returned_keys))
        if do_values:
            count = 0
            for value in returned_values:
                self.assertEqual(value['from'], 'foo')
                self.assertLess(int(value.get_payload()), repetitions)
                count += 1
            self.assertEqual(len(values), count)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_popitem(self, iterations=10):
        # Get and remove an arbitrary (key, message) using popitem()
        keys = []
        for i in xrange(10):
            keys.append(self._box.add(self._template % i))
        seen = []
        for i in xrange(10):
            key, msg = self._box.popitem()
            self.assertIn(key, keys)
            self.assertNotIn(key, seen)
            seen.append(key)
            self.assertEqual(int(msg.get_payload()), keys.index(key))
        self.assertEqual(len(self._box), 0)
        for key in keys:
            self.assertRaises(KeyError, lambda: self._box[key])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_initialize_with_nothing(self):
        # Initialize without arguments
        msg = self._factory()
        self._post_initialize_hook(msg)
        self.assertIsInstance(msg, email.message.Message)
        self.assertIsInstance(msg, mailbox.Message)
        self.assertIsInstance(msg, self._factory)
        self.assertEqual(msg.keys(), [])
        self.assertFalse(msg.is_multipart())
        self.assertIsNone(msg.get_payload())
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _check_sample(self, msg):
        # Inspect a mailbox.Message representation of the sample message
        self.assertIsInstance(msg, email.message.Message)
        self.assertIsInstance(msg, mailbox.Message)
        for key, value in _sample_headers.items():
            self.assertIn(value, msg.get_all(key))
        self.assertTrue(msg.is_multipart())
        self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
        for i, payload in enumerate(_sample_payloads):
            part = msg.get_payload(i)
            self.assertIsInstance(part, email.message.Message)
            self.assertNotIsInstance(part, mailbox.Message)
            self.assertEqual(part.get_payload(), payload)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_add_8bit_body(self):
        key = self._box.add(self._non_latin_bin_msg)
        self.assertEqual(self._box.get_bytes(key),
                         self._non_latin_bin_msg)
        with self._box.get_file(key) as f:
            self.assertEqual(f.read(),
                             self._non_latin_bin_msg.replace(b'\n',
                                os.linesep.encode()))
        self.assertEqual(self._box[key].get_payload(),
                        "??, ??? ?????.\n")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_get(self):
        # Retrieve messages using get()
        key0 = self._box.add(self._template % 0)
        msg = self._box.get(key0)
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.get_payload(), '0\n')
        self.assertIs(self._box.get('foo'), None)
        self.assertIs(self._box.get('foo', False), False)
        self._box.close()
        self._box = self._factory(self._path)
        key1 = self._box.add(self._template % 1)
        msg = self._box.get(key1)
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.get_payload(), '1\n')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_getitem(self):
        # Retrieve message using __getitem__()
        key0 = self._box.add(self._template % 0)
        msg = self._box[key0]
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.get_payload(), '0\n')
        self.assertRaises(KeyError, lambda: self._box['foo'])
        self._box.discard(key0)
        self.assertRaises(KeyError, lambda: self._box[key0])
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_get_message(self):
        # Get Message representations of messages
        key0 = self._box.add(self._template % 0)
        key1 = self._box.add(_sample_message)
        msg0 = self._box.get_message(key0)
        self.assertIsInstance(msg0, mailbox.Message)
        self.assertEqual(msg0['from'], 'foo')
        self.assertEqual(msg0.get_payload(), '0\n')
        self._check_sample(self._box.get_message(key1))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_popitem(self, iterations=10):
        # Get and remove an arbitrary (key, message) using popitem()
        keys = []
        for i in range(10):
            keys.append(self._box.add(self._template % i))
        seen = []
        for i in range(10):
            key, msg = self._box.popitem()
            self.assertIn(key, keys)
            self.assertNotIn(key, seen)
            seen.append(key)
            self.assertEqual(int(msg.get_payload()), keys.index(key))
        self.assertEqual(len(self._box), 0)
        for key in keys:
            self.assertRaises(KeyError, lambda: self._box[key])
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_initialize_with_nothing(self):
        # Initialize without arguments
        msg = self._factory()
        self._post_initialize_hook(msg)
        self.assertIsInstance(msg, email.message.Message)
        self.assertIsInstance(msg, mailbox.Message)
        self.assertIsInstance(msg, self._factory)
        self.assertEqual(msg.keys(), [])
        self.assertFalse(msg.is_multipart())
        self.assertEqual(msg.get_payload(), None)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _check_sample(self, msg):
        # Inspect a mailbox.Message representation of the sample message
        self.assertIsInstance(msg, email.message.Message)
        self.assertIsInstance(msg, mailbox.Message)
        for key, value in _sample_headers.iteritems():
            self.assertIn(value, msg.get_all(key))
        self.assertTrue(msg.is_multipart())
        self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
        for i, payload in enumerate(_sample_payloads):
            part = msg.get_payload(i)
            self.assertIsInstance(part, email.message.Message)
            self.assertNotIsInstance(part, mailbox.Message)
            self.assertEqual(part.get_payload(), payload)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_get(self):
        # Retrieve messages using get()
        key0 = self._box.add(self._template % 0)
        msg = self._box.get(key0)
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.get_payload(), '0\n')
        self.assertIsNone(self._box.get('foo'))
        self.assertFalse(self._box.get('foo', False))
        self._box.close()
        self._box = self._factory(self._path, factory=rfc822.Message)
        key1 = self._box.add(self._template % 1)
        msg = self._box.get(key1)
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.fp.read(), '1' + os.linesep)
        msg.fp.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_getitem(self):
        # Retrieve message using __getitem__()
        key0 = self._box.add(self._template % 0)
        msg = self._box[key0]
        self.assertEqual(msg['from'], 'foo')
        self.assertEqual(msg.get_payload(), '0\n')
        self.assertRaises(KeyError, lambda: self._box['foo'])
        self._box.discard(key0)
        self.assertRaises(KeyError, lambda: self._box[key0])
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_get_message(self):
        # Get Message representations of messages
        key0 = self._box.add(self._template % 0)
        key1 = self._box.add(_sample_message)
        msg0 = self._box.get_message(key0)
        self.assertIsInstance(msg0, mailbox.Message)
        self.assertEqual(msg0['from'], 'foo')
        self.assertEqual(msg0.get_payload(), '0\n')
        self._check_sample(self._box.get_message(key1))