Python email 模块,message_from_string() 实例源码

我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用email.message_from_string()

项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_split_long_continuation(self):
        eq = self.ndiffAssertEqual
        msg = email.message_from_string("""\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text

test
""")
        sfp = StringIO()
        g = Generator(sfp)
        g.flatten(msg)
        eq(sfp.getvalue(), """\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text

test
""")
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_boundary_with_leading_space(self):
        eq = self.assertEqual
        msg = email.message_from_string('''\
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="    XXXX"

--    XXXX
Content-Type: text/plain


--    XXXX
Content-Type: text/plain

--    XXXX--
''')
        self.failUnless(msg.is_multipart())
        eq(msg.get_boundary(), '    XXXX')
        eq(len(msg.get_payload()), 2)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_message_from_string_with_class(self):
        unless = self.failUnless
        fp = openfile('msg_01.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        # Create a subclass
        class MyMessage(Message):
            pass

        msg = email.message_from_string(text, MyMessage)
        unless(isinstance(msg, MyMessage))
        # Try something more complicated
        fp = openfile('msg_02.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        msg = email.message_from_string(text, MyMessage)
        for subpart in msg.walk():
            unless(isinstance(subpart, MyMessage))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        # Can't use sorted() here due to Python 2.3 compatibility
        all = module.__all__[:]
        all.sort()
        self.assertEqual(all, [
            # Old names
            'Charset', 'Encoders', 'Errors', 'Generator',
            'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
            'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
            'MIMENonMultipart', 'MIMEText', 'Message',
            'Parser', 'Utils', 'base64MIME',
            # new names
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quopriMIME', 'quoprimime', 'utils',
            ])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_param(self):
        eq = self.assertEqual
        msg = email.message_from_string(
            "X-Header: foo=one; bar=two; baz=three\n")
        eq(msg.get_param('bar', header='x-header'), 'two')
        eq(msg.get_param('quuz', header='x-header'), None)
        eq(msg.get_param('quuz'), None)
        msg = email.message_from_string(
            'X-Header: foo; bar="one"; baz=two\n')
        eq(msg.get_param('foo', header='x-header'), '')
        eq(msg.get_param('bar', header='x-header'), 'one')
        eq(msg.get_param('baz', header='x-header'), 'two')
        # XXX: We are not RFC-2045 compliant!  We cannot parse:
        # msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"'
        # msg.get_param("weird")
        # yet.
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_split_long_continuation(self):
        eq = self.ndiffAssertEqual
        msg = email.message_from_string("""\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text

test
""")
        sfp = StringIO()
        g = Generator(sfp)
        g.flatten(msg)
        eq(sfp.getvalue(), """\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text

test
""")
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_boundary_with_leading_space(self):
        eq = self.assertEqual
        msg = email.message_from_string('''\
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="    XXXX"

--    XXXX
Content-Type: text/plain


--    XXXX
Content-Type: text/plain

--    XXXX--
''')
        self.failUnless(msg.is_multipart())
        eq(msg.get_boundary(), '    XXXX')
        eq(len(msg.get_payload()), 2)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_message_from_string_with_class(self):
        unless = self.failUnless
        fp = openfile('msg_01.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        # Create a subclass
        class MyMessage(Message):
            pass

        msg = email.message_from_string(text, MyMessage)
        unless(isinstance(msg, MyMessage))
        # Try something more complicated
        fp = openfile('msg_02.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        msg = email.message_from_string(text, MyMessage)
        for subpart in msg.walk():
            unless(isinstance(subpart, MyMessage))
项目:PySMS    作者:clayshieh    | 项目源码 | 文件源码
def check_email(self, uid, email_data, current_time):
        mail = email.message_from_string(email_data[0][1])
        mail_time = email.utils.mktime_tz(email.utils.parsedate_tz(mail["Date"]))
        if current_time - mail_time < self.window * 60:
            if mail.get_content_maintype() == "multipart":
                for part in mail.walk():
                    if part.get_content_maintype() != 'multipart' and part.get('Content-Disposition') is not None:
                        response = part.get_payload(decode=True)
                        response = response.split(self.delimiter)
                        if len(response) == 2:
                            key = response[0].strip()
                            value = response[1].strip()
                            # If hook is not valid then also ignore
                            if not self.execute_hook(key, value):
                                self.logger.info("Adding failed hook with uid: {uid} to ignore.".format(uid=uid))
                                self.add_ignore(mail, uid)
                            return
        # Clean_hook_dict will take care of this later
        self.logger.info("Email with uid: {uid} is expired, ignoring in next check".format(uid=uid))
        # Add uid to ignore if uid is expired so it knows not to request it next cycle
        self.add_ignore(mail, uid)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_split_long_continuation(self):
        eq = self.ndiffAssertEqual
        msg = email.message_from_string("""\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text

test
""")
        sfp = StringIO()
        g = Generator(sfp)
        g.flatten(msg)
        eq(sfp.getvalue(), """\
Subject: bug demonstration
 12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
 more text

test
""")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_binary_body_with_encode_7or8bit(self):
        # Issue 17171.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_7or8bit)
        # Treated as a string, this will be invalid code points.
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        self.assertEqual(msg['Content-Transfer-Encoding'], '8bit')
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_string(wireform)
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)
        self.assertEqual(msg2['Content-Transfer-Encoding'], '8bit')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_boundary_with_leading_space(self):
        eq = self.assertEqual
        msg = email.message_from_string('''\
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="    XXXX"

--    XXXX
Content-Type: text/plain


--    XXXX
Content-Type: text/plain

--    XXXX--
''')
        self.assertTrue(msg.is_multipart())
        eq(msg.get_boundary(), '    XXXX')
        eq(len(msg.get_payload()), 2)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_message_from_string_with_class(self):
        fp = openfile('msg_01.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        # Create a subclass
        class MyMessage(Message):
            pass

        msg = email.message_from_string(text, MyMessage)
        self.assertIsInstance(msg, MyMessage)
        # Try something more complicated
        fp = openfile('msg_02.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        msg = email.message_from_string(text, MyMessage)
        for subpart in msg.walk():
            self.assertIsInstance(subpart, MyMessage)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_get_param(self):
        eq = self.assertEqual
        msg = email.message_from_string(
            "X-Header: foo=one; bar=two; baz=three\n")
        eq(msg.get_param('bar', header='x-header'), 'two')
        eq(msg.get_param('quuz', header='x-header'), None)
        eq(msg.get_param('quuz'), None)
        msg = email.message_from_string(
            'X-Header: foo; bar="one"; baz=two\n')
        eq(msg.get_param('foo', header='x-header'), '')
        eq(msg.get_param('bar', header='x-header'), 'one')
        eq(msg.get_param('baz', header='x-header'), 'two')
        # XXX: We are not RFC-2045 compliant!  We cannot parse:
        # msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"'
        # msg.get_param("weird")
        # yet.
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_boundary_with_leading_space(self):
        eq = self.assertEqual
        msg = email.message_from_string('''\
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="    XXXX"

--    XXXX
Content-Type: text/plain


--    XXXX
Content-Type: text/plain

--    XXXX--
''')
        self.assertTrue(msg.is_multipart())
        eq(msg.get_boundary(), '    XXXX')
        eq(len(msg.get_payload()), 2)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_message_from_string_with_class(self):
        fp = openfile('msg_01.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        # Create a subclass
        class MyMessage(Message):
            pass

        msg = email.message_from_string(text, MyMessage)
        self.assertIsInstance(msg, MyMessage)
        # Try something more complicated
        fp = openfile('msg_02.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        msg = email.message_from_string(text, MyMessage)
        for subpart in msg.walk():
            self.assertIsInstance(subpart, MyMessage)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        all = module.__all__
        all.sort()
        self.assertEqual(all, [
            # Old names
            'Charset', 'Encoders', 'Errors', 'Generator',
            'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
            'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
            'MIMENonMultipart', 'MIMEText', 'Message',
            'Parser', 'Utils', 'base64MIME',
            # new names
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quopriMIME', 'quoprimime', 'utils',
            ])
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_rfc2231_unencoded_then_encoded_segments(self):
        eq = self.assertEqual
        m = """\
Content-Type: application/x-foo;
\tname*0=\"us-ascii'en-us'My\";
\tname*1*=\" Document\";
\tname*2*=\" For You\"

"""
        msg = email.message_from_string(m)
        charset, language, s = msg.get_param('name')
        eq(charset, 'us-ascii')
        eq(language, 'en-us')
        eq(s, 'My Document For You')



# Tests to ensure that signed parts of an email are completely preserved, as
# required by RFC1847 section 2.1.  Note that these are incomplete, because the
# email package does not currently always preserve the body.  See issue 1670765.
项目:egt    作者:spanezz    | 项目源码 | 文件源码
def parse(self, lines):
        """
        Parse a metadata section from a Lines object
        """
        # Parse raw lines
        self._lineno = lines.lineno

        # Get everything until we reach an empty line
        while True:
            line = lines.next()
            # Stop at an empty line or at EOF
            if not line: break
            self._lines.append(line)

        # Parse fields in the same way as email headers
        import email
        for k, v in email.message_from_string("\n".join(self._lines)).items():
            self._raw[k.lower()] = v.strip()

        # Extract well known values

        # Tags
        f = self._raw.get("tags", None)
        if f is not None:
            self.tags.update(re.split("[ ,\t]+", f))
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_split_long_continuation(self):
        eq = self.ndiffAssertEqual
        msg = email.message_from_string("""\
Subject: bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text

test
""")
        sfp = StringIO()
        g = Generator(sfp)
        g.flatten(msg)
        eq(sfp.getvalue(), """\
Subject: bug demonstration
 12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
 more text

test
""")
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_binary_body_with_encode_7or8bit(self):
        # Issue 17171.
        bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
        msg = MIMEApplication(bytesdata, _encoder=encoders.encode_7or8bit)
        # Treated as a string, this will be invalid code points.
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg.get_payload(decode=True), bytesdata)
        self.assertEqual(msg['Content-Transfer-Encoding'], '8bit')
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        wireform = s.getvalue()
        msg2 = email.message_from_string(wireform)
        self.assertEqual(msg.get_payload(), bytesdata)
        self.assertEqual(msg2.get_payload(decode=True), bytesdata)
        self.assertEqual(msg2['Content-Transfer-Encoding'], '8bit')
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_boundary_with_leading_space(self):
        eq = self.assertEqual
        msg = email.message_from_string('''\
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="    XXXX"

--    XXXX
Content-Type: text/plain


--    XXXX
Content-Type: text/plain

--    XXXX--
''')
        self.assertTrue(msg.is_multipart())
        eq(msg.get_boundary(), '    XXXX')
        eq(len(msg.get_payload()), 2)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_message_from_string_with_class(self):
        fp = openfile('msg_01.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        # Create a subclass
        class MyMessage(Message):
            pass

        msg = email.message_from_string(text, MyMessage)
        self.assertIsInstance(msg, MyMessage)
        # Try something more complicated
        fp = openfile('msg_02.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        msg = email.message_from_string(text, MyMessage)
        for subpart in msg.walk():
            self.assertIsInstance(subpart, MyMessage)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, message=None):
        """Initialize a Message instance."""
        if isinstance(message, email.message.Message):
            self._become_message(copy.deepcopy(message))
            if isinstance(message, Message):
                message._explain_to(self)
        elif isinstance(message, str):
            self._become_message(email.message_from_string(message))
        elif hasattr(message, "read"):
            self._become_message(email.message_from_file(message))
        elif message is None:
            email.message.Message.__init__(self)
        else:
            raise TypeError('Invalid message type: %s' % type(message))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_bad_param(self):
        msg = email.message_from_string("Content-Type: blarg; baz; boo\n")
        self.assertEqual(msg.get_param('baz'), '')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_missing_filename(self):
        msg = email.message_from_string("From: foo\n")
        self.assertEqual(msg.get_filename(), None)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_bogus_filename(self):
        msg = email.message_from_string(
        "Content-Disposition: blarg; filename\n")
        self.assertEqual(msg.get_filename(), '')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_missing_boundary(self):
        msg = email.message_from_string("From: foo\n")
        self.assertEqual(msg.get_boundary(), None)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_params(self):
        eq = self.assertEqual
        msg = email.message_from_string(
            'X-Header: foo=one; bar=two; baz=three\n')
        eq(msg.get_params(header='x-header'),
           [('foo', 'one'), ('bar', 'two'), ('baz', 'three')])
        msg = email.message_from_string(
            'X-Header: foo; bar=one; baz=two\n')
        eq(msg.get_params(header='x-header'),
           [('foo', ''), ('bar', 'one'), ('baz', 'two')])
        eq(msg.get_params(), None)
        msg = email.message_from_string(
            'X-Header: foo; bar="one"; baz=two\n')
        eq(msg.get_params(header='x-header'),
           [('foo', ''), ('bar', 'one'), ('baz', 'two')])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_param_with_semis_in_quotes(self):
        msg = email.message_from_string(
            'Content-Type: image/pjpeg; name="Jim&amp;&amp;Jill"\n')
        self.assertEqual(msg.get_param('name'), 'Jim&amp;&amp;Jill')
        self.assertEqual(msg.get_param('name', unquote=False),
                         '"Jim&amp;&amp;Jill"')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_has_key(self):
        msg = email.message_from_string('Header: exists')
        self.failUnless(msg.has_key('header'))
        self.failUnless(msg.has_key('Header'))
        self.failUnless(msg.has_key('HEADER'))
        self.failIf(msg.has_key('headeri'))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_another_long_multiline_header(self):
        eq = self.ndiffAssertEqual
        m = '''\
Received: from siimage.com ([172.25.1.3]) by zima.siliconimage.com with Microsoft SMTPSVC(5.0.2195.4905);
\tWed, 16 Oct 2002 07:41:11 -0700'''
        msg = email.message_from_string(m)
        eq(msg.as_string(), '''\
Received: from siimage.com ([172.25.1.3]) by zima.siliconimage.com with
\tMicrosoft SMTPSVC(5.0.2195.4905); Wed, 16 Oct 2002 07:41:11 -0700

''')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_message_from_string(self):
        fp = openfile('msg_01.txt')
        try:
            text = fp.read()
        finally:
            fp.close()
        msg = email.message_from_string(text)
        s = StringIO()
        # Don't wrap/continue long headers since we're trying to test
        # idempotency.
        g = Generator(s, maxheaderlen=0)
        g.flatten(msg)
        self.assertEqual(text, s.getvalue())
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_strip_line_feed_and_carriage_return_in_headers(self):
        eq = self.assertEqual
        # For [ 1002475 ] email message parser doesn't handle \r\n correctly
        value1 = 'text'
        value2 = 'more text'
        m = 'Header: %s\r\nNext-Header: %s\r\n\r\nBody\r\n\r\n' % (
            value1, value2)
        msg = email.message_from_string(m)
        eq(msg.get('Header'), value1)
        eq(msg.get('Next-Header'), value2)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_rfc2822_header_syntax(self):
        eq = self.assertEqual
        m = '>From: foo\nFrom: bar\n!"#QUX;~: zoo\n\nbody'
        msg = email.message_from_string(m)
        eq(len(msg.keys()), 3)
        keys = msg.keys()
        keys.sort()
        eq(keys, ['!"#QUX;~', '>From', 'From'])
        eq(msg.get_payload(), 'body')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_rfc2822_one_character_header(self):
        eq = self.assertEqual
        m = 'A: first header\nB: second header\nCC: third header\n\nbody'
        msg = email.message_from_string(m)
        headers = msg.keys()
        headers.sort()
        eq(headers, ['A', 'B', 'CC'])
        eq(msg.get_payload(), 'body')