我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用email.parser.BytesParser()。
def split_email(self, raw_email): parsed_email = BytesParser().parsebytes(raw_email) to_keep = [] attachments = [] if parsed_email.is_multipart(): for p in parsed_email.get_payload(): if p.get_filename(): filename = decode_header(p.get_filename()) if filename[0][1]: filename = filename[0][0].decode(filename[0][1]) else: filename = filename[0][0] attachments.append(File(p.get_payload(decode=True), filename)) else: to_keep.append(p) else: to_keep.append(parsed_email.get_payload()) return to_keep, attachments, parsed_email
def message_from_bytes(s, *args, **kws): """Parse a bytes string into a Message object model. Optional _class and strict are passed to the Parser constructor. """ from email.parser import BytesParser return BytesParser(*args, **kws).parsebytes(s)
def message_from_binary_file(fp, *args, **kws): """Read a binary file and parse its contents into a Message object model. Optional _class and strict are passed to the Parser constructor. """ from email.parser import BytesParser return BytesParser(*args, **kws).parse(fp)
def get_content(self, raw): data = base64.urlsafe_b64decode(raw) email_parser = EmailParser(policy=policy.default) email = email_parser.parsebytes(data) plain = email.get_body(preferencelist=('plain',)) body = None if plain: body = plain.get_payload() email_dict = dict(email) email_dict['body'] = body return email_dict
def _format_denied_recipients(self, original_mail: bytes, recipients: Sequence[str]) -> bytes: parser = BytesParser() # TODO: fix type annotation when typeshed has better stubs msg = None # type: Message msg = parser.parsebytes(original_mail, True) # type: ignore msg["Subject"] = "[mailforwarder error] Re: %s" % msg["Subject"] msg["To"] = msg["From"] msg["From"] = "mailforwarder bounce <>" rcptlist = "" for rcpt in recipients: rcptlist = "%s\n%s" % (" * %s" % rcpt, rcptlist,) txt = denied_recipients_template.format(rcptlist=rcptlist) msg.set_payload(txt, charset='utf-8') return msg.as_bytes(policy=policy.SMTP)
def add_received_header(self, peer: Tuple[str, int], msg: bytes, channel: PatchedSMTPChannel) -> bytes: parser = BytesParser(_class=SaneMessage) # TODO: remove type annotation when issue is fixed new_msg = None # type: SaneMessage new_msg = parser.parsebytes(msg) # type: ignore new_msg.prepend_header("Received", "from %s (%s:%s)\r\n\tby %s (%s [%s:%s]) with SMTP;\r\n\t%s" % (channel.seen_greeting, peer[0], peer[1], self.server_name, self.daemon_name, self._localaddr[0], self._localaddr[1], timezone.now().strftime("%a, %d %b %Y %H:%M:%S %z (%Z)"))) return new_msg.as_bytes(policy=policy.SMTP)