我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用email.policy()。
def updateTopic(self, topic): if not self.initialized: return [] if time.time() - self.time > 60.: self.connect() try: (_, _, first, last, _) = self.conn.group(topic) except Exception as e: print(e, datetime.datetime.now()) traceback.print_exc() self.connect() return start = max(self.groups[topic] + 1, first) res = [] headers = ("From", "Newsgroups", "Subject", "Date") registry = HeaderRegistry() registry.map_to_type('From', UnstructuredHeader) policy = EmailPolicy(header_factory=registry) while start <= last: try: artic = self.conn.article(start)[1] raw_msg = b'\r\n'.join(artic.lines) mime_msg = email.message_from_bytes(raw_msg, policy=policy) hdr = "<code>" for h in headers: hdr += "%s: %s\r\n" % (h, html.escape(mime_msg[h])) content = "" for part in mime_msg.walk(): if part.get_content_type() == 'text/plain': content += html.escape(part.get_content()) is_plus_one = isPlusOne(content) hdr += "%s: %s\r\n" % ("is_plus_one", is_plus_one) hdr += "</code>\r\n" res.append([is_plus_one, hdr + content]) except Exception as e: print(e, datetime.datetime.now()) traceback.print_exc() start += 1 self.groups[topic] = last
def test_as_string_policy(self): msg = self._msgobj('msg_01.txt') newpolicy = msg.policy.clone(linesep='\r\n') fullrepr = msg.as_string(policy=newpolicy) s = StringIO() g = Generator(s, policy=newpolicy) g.flatten(msg) self.assertEqual(fullrepr, s.getvalue())
def test_as_bytes_policy(self): msg = self._msgobj('msg_01.txt') newpolicy = msg.policy.clone(linesep='\r\n') fullrepr = msg.as_bytes(policy=newpolicy) s = BytesIO() g = BytesGenerator(s,policy=newpolicy) g.flatten(msg) self.assertEqual(fullrepr, s.getvalue()) # test_headerregistry.TestContentTypeHeader.bad_params
def test_bytes_parser_on_exception_does_not_close_file(self): with openfile('msg_15.txt', 'rb') as fp: bytesParser = email.parser.BytesParser self.assertRaises(email.errors.StartBoundaryNotFoundDefect, bytesParser(policy=email.policy.strict).parse, fp) self.assertFalse(fp.closed)
def test_parser_on_exception_does_not_close_file(self): with openfile('msg_15.txt', 'r') as fp: parser = email.parser.Parser self.assertRaises(email.errors.StartBoundaryNotFoundDefect, parser(policy=email.policy.strict).parse, fp) self.assertFalse(fp.closed)
def parse_message(msgstring): return email.message_from_string(msgstring, policy=email.policy.default)