Python email.header 模块,Header() 实例源码

我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用email.header.Header()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, outfp, mangle_from_=True, maxheaderlen=78):
        """Create the generator for message flattening.

        outfp is the output file-like object for writing the message to.  It
        must have a write() method.

        Optional mangle_from_ is a flag that, when True (the default), escapes
        From_ lines in the body of the message by putting a `>' in front of
        them.

        Optional maxheaderlen specifies the longest length for a non-continued
        header.  When a header line is longer (in characters, with tabs
        expanded to 8 spaces) than maxheaderlen, the header will split as
        defined in the Header class.  Set maxheaderlen to zero to disable
        header wrapping.  The default is 78, as recommended (but not required)
        by RFC 2822.
        """
        self._fp = outfp
        self._mangle_from_ = mangle_from_
        self._maxheaderlen = maxheaderlen
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __init__(self, outfp, mangle_from_=True, maxheaderlen=78):
        """Create the generator for message flattening.

        outfp is the output file-like object for writing the message to.  It
        must have a write() method.

        Optional mangle_from_ is a flag that, when True (the default), escapes
        From_ lines in the body of the message by putting a `>' in front of
        them.

        Optional maxheaderlen specifies the longest length for a non-continued
        header.  When a header line is longer (in characters, with tabs
        expanded to 8 spaces) than maxheaderlen, the header will split as
        defined in the Header class.  Set maxheaderlen to zero to disable
        header wrapping.  The default is 78, as recommended (but not required)
        by RFC 2822.
        """
        self._fp = outfp
        self._mangle_from_ = mangle_from_
        self._maxheaderlen = maxheaderlen
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_param(self):
        eq = self.assertEqual
        msg = email.message_from_string(
            "X-Header: foo=one; bar=two; baz=three\n")
        eq(msg.get_param('bar', header='x-header'), 'two')
        eq(msg.get_param('quuz', header='x-header'), None)
        eq(msg.get_param('quuz'), None)
        msg = email.message_from_string(
            'X-Header: foo; bar="one"; baz=two\n')
        eq(msg.get_param('foo', header='x-header'), '')
        eq(msg.get_param('bar', header='x-header'), 'one')
        eq(msg.get_param('baz', header='x-header'), 'two')
        # XXX: We are not RFC-2045 compliant!  We cannot parse:
        # msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"'
        # msg.get_param("weird")
        # yet.
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        # Can't use sorted() here due to Python 2.3 compatibility
        all = module.__all__[:]
        all.sort()
        self.assertEqual(all, [
            # Old names
            'Charset', 'Encoders', 'Errors', 'Generator',
            'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
            'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
            'MIMENonMultipart', 'MIMEText', 'Message',
            'Parser', 'Utils', 'base64MIME',
            # new names
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quopriMIME', 'quoprimime', 'utils',
            ])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def sanitize_address(addr, encoding='utf-8'):
    if isinstance(addr, string_types):
        addr = parseaddr(force_text(addr))
    nm, addr = addr

    try:
        nm = Header(nm, encoding).encode()
    except UnicodeEncodeError:
        nm = Header(nm, 'utf-8').encode()
    try:
        addr.encode('ascii')
    except UnicodeEncodeError:  # IDN
        if '@' in addr:
            localpart, domain = addr.split('@', 1)
            localpart = str(Header(localpart, encoding))
            domain = domain.encode('idna').decode('ascii')
            addr = '@'.join([localpart, domain])
        else:
            addr = Header(addr, encoding).encode()
    return formataddr((nm, addr))
项目:zabbix_manager    作者:BillWang139967    | 项目源码 | 文件源码
def parse_attachment(self, message_part):
        content_disposition = message_part.get("Content-Disposition", None)
        if content_disposition:
            dispositions = content_disposition.strip().split(";")
            if bool(content_disposition and dispositions[0].lower() == "attachment"):

                file_data = message_part.get_payload(decode=True)
                attachment = {}
                attachment["content_type"] = message_part.get_content_type()
                attachment["size"] = len(file_data)
                deName = email.Header.decode_header(message_part.get_filename())[0]
                name = deName[0]
                if deName[1] != None:
                    name = unicode(deName[0], deName[1])
                print name
                attachment["name"] = name
                attachment["data"] = file_data
                '''????
                fileobject = open(name, "wb")
                fileobject.write(file_data)
                fileobject.close()
                '''
                return attachment
        return None
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def forbid_multi_line_headers(name, val, encoding):
    """Forbids multi-line headers, to prevent header injection."""
    encoding = encoding or settings.DEFAULT_CHARSET
    val = force_text(val)
    if '\n' in val or '\r' in val:
        raise BadHeaderError("Header values can't contain newlines (got %r for header %r)" % (val, name))
    try:
        val.encode('ascii')
    except UnicodeEncodeError:
        if name.lower() in ADDRESS_HEADERS:
            val = ', '.join(sanitize_address(addr, encoding)
                for addr in getaddresses((val,)))
        else:
            val = Header(val, encoding).encode()
    else:
        if name.lower() == 'subject':
            val = Header(val).encode()
    return str(name), val
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def sanitize_address(addr, encoding):
    if not isinstance(addr, tuple):
        addr = parseaddr(force_text(addr))
    nm, addr = addr
    nm = Header(nm, encoding).encode()
    try:
        addr.encode('ascii')
    except UnicodeEncodeError:  # IDN
        if '@' in addr:
            localpart, domain = addr.split('@', 1)
            localpart = str(Header(localpart, encoding))
            domain = domain.encode('idna').decode('ascii')
            addr = '@'.join([localpart, domain])
        else:
            addr = Header(addr, encoding).encode()
    return formataddr((nm, addr))
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def forbid_multi_line_headers(name, val, encoding):
    """Forbids multi-line headers, to prevent header injection."""
    encoding = encoding or settings.DEFAULT_CHARSET
    val = force_text(val)
    if '\n' in val or '\r' in val:
        raise BadHeaderError("Header values can't contain newlines (got %r for header %r)" % (val, name))
    try:
        val.encode('ascii')
    except UnicodeEncodeError:
        if name.lower() in ADDRESS_HEADERS:
            val = ', '.join(sanitize_address(addr, encoding) for addr in getaddresses((val,)))
        else:
            val = Header(val, encoding).encode()
    else:
        if name.lower() == 'subject':
            val = Header(val).encode()
    return str(name), val
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, outfp, mangle_from_=True, maxheaderlen=78):
        """Create the generator for message flattening.

        outfp is the output file-like object for writing the message to.  It
        must have a write() method.

        Optional mangle_from_ is a flag that, when True (the default), escapes
        From_ lines in the body of the message by putting a `>' in front of
        them.

        Optional maxheaderlen specifies the longest length for a non-continued
        header.  When a header line is longer (in characters, with tabs
        expanded to 8 spaces) than maxheaderlen, the header will split as
        defined in the Header class.  Set maxheaderlen to zero to disable
        header wrapping.  The default is 78, as recommended (but not required)
        by RFC 2822.
        """
        self._fp = outfp
        self._mangle_from_ = mangle_from_
        self._maxheaderlen = maxheaderlen
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_get_param(self):
        eq = self.assertEqual
        msg = email.message_from_string(
            "X-Header: foo=one; bar=two; baz=three\n")
        eq(msg.get_param('bar', header='x-header'), 'two')
        eq(msg.get_param('quuz', header='x-header'), None)
        eq(msg.get_param('quuz'), None)
        msg = email.message_from_string(
            'X-Header: foo; bar="one"; baz=two\n')
        eq(msg.get_param('foo', header='x-header'), '')
        eq(msg.get_param('bar', header='x-header'), 'one')
        eq(msg.get_param('baz', header='x-header'), 'two')
        # XXX: We are not RFC-2045 compliant!  We cannot parse:
        # msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"'
        # msg.get_param("weird")
        # yet.
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        # Can't use sorted() here due to Python 2.3 compatibility
        all = module.__all__[:]
        all.sort()
        self.assertEqual(all, [
            # Old names
            'Charset', 'Encoders', 'Errors', 'Generator',
            'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
            'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
            'MIMENonMultipart', 'MIMEText', 'Message',
            'Parser', 'Utils', 'base64MIME',
            # new names
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quopriMIME', 'quoprimime', 'utils',
            ])
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def _bind_write_headers(msg):
  from email.header import Header
  def _write_headers(self):
      # Self refers to the Generator object
      for h, v in msg.items():
          print('%s:' % h, end=' ', file=self._fp)
          if isinstance(v, Header):
              print(v.encode(maxlinelen=self._maxheaderlen), file=self._fp)
          else:
              # Header's got lots of smarts, so use it.
              header = Header(v, maxlinelen=self._maxheaderlen, charset='utf-8',
                              header_name=h)
              print(header.encode(), file=self._fp)
      # A blank line always separates headers from body
      print(file=self._fp)
  return _write_headers
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def forbid_multi_line_headers(name, val, encoding):
    """Forbids multi-line headers, to prevent header injection."""
    encoding = encoding or settings.DEFAULT_CHARSET
    val = force_text(val)
    if '\n' in val or '\r' in val:
        raise BadHeaderError("Header values can't contain newlines (got %r for header %r)" % (val, name))
    try:
        val.encode('ascii')
    except UnicodeEncodeError:
        if name.lower() in ADDRESS_HEADERS:
            val = ', '.join(sanitize_address(addr, encoding) for addr in getaddresses((val,)))
        else:
            val = Header(val, encoding).encode()
    else:
        if name.lower() == 'subject':
            val = Header(val).encode()
    return str(name), val
项目:ZhihuSpider    作者:KEN-LJQ    | 项目源码 | 文件源码
def send_message(self, email_content):
        # ???????
        now = datetime.datetime.now()
        header = self.smtp_email_header + '[' + str(now.month) + '-' + str(now.day) + ' ' + \
            str(now.hour) + ':' + str(now.minute) + ':' + str(now.second) + ']'
        msg = MIMEText(email_content, 'plain', 'utf-8')
        msg['from'] = self.smtp_from_addr
        msg['to'] = self.smtp_to_addr
        msg['Subject'] = Header(header, 'utf-8').encode()

        # ??
        try:
            smtp_server = smtplib.SMTP(self.smtp_server_host, self.smtp_server_port)
            smtp_server.login(self.smtp_from_addr, self.smtp_server_password)
            smtp_server.sendmail(self.smtp_from_addr, [self.smtp_to_addr], msg.as_string())
            smtp_server.quit()
        except Exception as e:
            if log.isEnabledFor(logging.ERROR):
                log.error("??????")
                log.exception(e)
项目:base_function    作者:Rockyzsu    | 项目源码 | 文件源码
def send_txt(self, filename):
        # ????????????????????????????????????
        self.smtp = smtplib.SMTP_SSL(port=465)
        self.smtp.connect(self.server)
        self.smtp.login(self.username, self.password)
        #self.msg = MIMEMultipart()
        self.msg = MIMEText("content", 'plain', 'utf-8')
        self.msg['to'] = self.to_mail
        self.msg['from'] = self.from_mail
        self.msg['Subject'] = "459"
        self.filename = filename + ".txt"
        self.msg['Date'] = Utils.formatdate(localtime=1)
        content = open(self.filename.decode('utf-8'), 'rb').read()
        # print content
        #self.att = MIMEText(content, 'base64', 'utf-8')
        #self.att['Content-Type'] = 'application/octet-stream'
        # self.att["Content-Disposition"] = "attachment;filename=\"%s\"" %(self.filename.encode('gb2312'))
        #self.att["Content-Disposition"] = "attachment;filename=\"%s\"" % Header(self.filename, 'gb2312')
        # print self.att["Content-Disposition"]
        #self.msg.attach(self.att)

        #self.smtp.sendmail(self.msg['from'], self.msg['to'], self.msg.as_string())
        self.smtp.sendmail(self.msg['from'], self.msg['to'], self.msg.as_string())
        self.smtp.quit()
项目:base_function    作者:Rockyzsu    | 项目源码 | 文件源码
def send_139():
    cfg = Toolkit.getUserData('data.cfg')
    sender = cfg['mail_user']
    passwd = cfg['mail_pass']
    receiver = cfg['receiver']
    msg = MIMEText('Python mail test', 'plain', 'utf-8')
    msg['From'] = Header('FromTest', 'utf-8')
    msg['To'] = Header('ToTest', 'utf-8')
    subject = 'Python SMTP Test'
    msg['Subject'] = Header(subject, 'utf-8')
    try:
        obj = smtplib.SMTP()
        obj.connect('smtp.126.com', 25)
        obj.login(sender, passwd)
        obj.sendmail(sender, receiver, msg.as_string())
    except smtplib.SMTPException, e:
        print e
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def forbid_multi_line_headers(name, val, encoding):
    """Forbids multi-line headers, to prevent header injection."""
    encoding = encoding or settings.DEFAULT_CHARSET
    val = force_text(val)
    if '\n' in val or '\r' in val:
        raise BadHeaderError("Header values can't contain newlines (got %r for header %r)" % (val, name))
    try:
        val.encode('ascii')
    except UnicodeEncodeError:
        if name.lower() in ADDRESS_HEADERS:
            val = ', '.join(sanitize_address(addr, encoding) for addr in getaddresses((val,)))
        else:
            val = Header(val, encoding).encode()
    else:
        if name.lower() == 'subject':
            val = Header(val).encode()
    return str(name), val
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def __init__(self, outfp, mangle_from_=True, maxheaderlen=78):
        """Create the generator for message flattening.

        outfp is the output file-like object for writing the message to.  It
        must have a write() method.

        Optional mangle_from_ is a flag that, when True (the default), escapes
        From_ lines in the body of the message by putting a `>' in front of
        them.

        Optional maxheaderlen specifies the longest length for a non-continued
        header.  When a header line is longer (in characters, with tabs
        expanded to 8 spaces) than maxheaderlen, the header will split as
        defined in the Header class.  Set maxheaderlen to zero to disable
        header wrapping.  The default is 78, as recommended (but not required)
        by RFC 2822.
        """
        self._fp = outfp
        self._mangle_from_ = mangle_from_
        self._maxheaderlen = maxheaderlen
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test_get_param(self):
        eq = self.assertEqual
        msg = email.message_from_string(
            "X-Header: foo=one; bar=two; baz=three\n")
        eq(msg.get_param('bar', header='x-header'), 'two')
        eq(msg.get_param('quuz', header='x-header'), None)
        eq(msg.get_param('quuz'), None)
        msg = email.message_from_string(
            'X-Header: foo; bar="one"; baz=two\n')
        eq(msg.get_param('foo', header='x-header'), '')
        eq(msg.get_param('bar', header='x-header'), 'one')
        eq(msg.get_param('baz', header='x-header'), 'two')
        # XXX: We are not RFC-2045 compliant!  We cannot parse:
        # msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"'
        # msg.get_param("weird")
        # yet.
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        # Can't use sorted() here due to Python 2.3 compatibility
        all = module.__all__[:]
        all.sort()
        self.assertEqual(all, [
            # Old names
            'Charset', 'Encoders', 'Errors', 'Generator',
            'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
            'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
            'MIMENonMultipart', 'MIMEText', 'Message',
            'Parser', 'Utils', 'base64MIME',
            # new names
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quopriMIME', 'quoprimime', 'utils',
            ])
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def sanitize_address(addr, encoding='utf-8'):
    if isinstance(addr, string_types):
        addr = parseaddr(force_text(addr))
    nm, addr = addr

    try:
        nm = Header(nm, encoding).encode()
    except UnicodeEncodeError:
        nm = Header(nm, 'utf-8').encode()
    try:
        addr.encode('ascii')
    except UnicodeEncodeError:  # IDN
        if '@' in addr:
            localpart, domain = addr.split('@', 1)
            localpart = str(Header(localpart, encoding))
            domain = domain.encode('idna').decode('ascii')
            addr = '@'.join([localpart, domain])
        else:
            addr = Header(addr, encoding).encode()
    return formataddr((nm, addr))
项目:GWMMS    作者:lvhuiyang    | 项目源码 | 文件源码
def send_mail(subject, from_addr, to_addr, content):
    """
    ??????????????? ??? ???? ???? ??
    ????????????????
    """
    password = os.getenv('MAIL_PASSWORD')
    msg = MIMEText(content, 'plain', 'utf-8')
    msg['Subject'] = Header(u'%s', 'utf-8').encode() % subject
    msg['From'] = _format_addr(u'lvhuiyang <%s>' % from_addr)
    msg['To'] = _format_addr(u'service of python_smtp <%s>' % to_addr)
    smtp_server = 'smtp.qq.com'
    smtp_port = 587
    server = smtplib.SMTP(smtp_server, smtp_port)
    server.starttls()
    server.set_debuglevel(1)
    server.login(from_addr, password)
    server.sendmail(from_addr, [to_addr], msg.as_string())
    server.quit()
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def _bind_write_headers(msg):
  from email.header import Header
  def _write_headers(self):
      # Self refers to the Generator object
      for h, v in msg.items():
          print('%s:' % h, end=' ', file=self._fp)
          if isinstance(v, Header):
              print(v.encode(maxlinelen=self._maxheaderlen), file=self._fp)
          else:
              # Header's got lots of smarts, so use it.
              header = Header(v, maxlinelen=self._maxheaderlen, charset='utf-8',
                              header_name=h)
              print(header.encode(), file=self._fp)
      # A blank line always separates headers from body
      print(file=self._fp)
  return _write_headers
项目:henet    作者:AcrDijon    | 项目源码 | 文件源码
def send_email(tos, subject, body, smtp_config):
    msg = MIMEText(body.encode('utf8'), 'plain', 'utf-8')
    msg['To'] = email.utils.formataddr(('Recipient', ','.join(tos)))
    msg['From'] = email.utils.formataddr(('Author', smtp_config['from']))
    msg['Subject'] = Header(subject, 'utf8')

    server = smtplib.SMTP(smtp_config['host'], smtp_config.get('port', 25))
    try:
        server.ehlo()
        if server.has_extn('STARTTLS'):
            server.starttls()
            server.ehlo()

        username = smtp_config.get('username')
        if username is not None:
            server.login(username, smtp_config['password'])
        server.sendmail(smtp_config['from'], tos, msg.as_string())
        logger.debug("Mail sent")
    except Exception:
        logger.exception("Could not send the email")
    finally:
        server.quit()
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def __init__(self, outfp, mangle_from_=True, maxheaderlen=78):
        """Create the generator for message flattening.

        outfp is the output file-like object for writing the message to.  It
        must have a write() method.

        Optional mangle_from_ is a flag that, when True (the default), escapes
        From_ lines in the body of the message by putting a `>' in front of
        them.

        Optional maxheaderlen specifies the longest length for a non-continued
        header.  When a header line is longer (in characters, with tabs
        expanded to 8 spaces) than maxheaderlen, the header will split as
        defined in the Header class.  Set maxheaderlen to zero to disable
        header wrapping.  The default is 78, as recommended (but not required)
        by RFC 2822.
        """
        self._fp = outfp
        self._mangle_from_ = mangle_from_
        self._maxheaderlen = maxheaderlen
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def test_get_param(self):
        eq = self.assertEqual
        msg = email.message_from_string(
            "X-Header: foo=one; bar=two; baz=three\n")
        eq(msg.get_param('bar', header='x-header'), 'two')
        eq(msg.get_param('quuz', header='x-header'), None)
        eq(msg.get_param('quuz'), None)
        msg = email.message_from_string(
            'X-Header: foo; bar="one"; baz=two\n')
        eq(msg.get_param('foo', header='x-header'), '')
        eq(msg.get_param('bar', header='x-header'), 'one')
        eq(msg.get_param('baz', header='x-header'), 'two')
        # XXX: We are not RFC-2045 compliant!  We cannot parse:
        # msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"'
        # msg.get_param("weird")
        # yet.
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def test__all__(self):
        module = __import__('email')
        # Can't use sorted() here due to Python 2.3 compatibility
        all = module.__all__[:]
        all.sort()
        self.assertEqual(all, [
            # Old names
            'Charset', 'Encoders', 'Errors', 'Generator',
            'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
            'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
            'MIMENonMultipart', 'MIMEText', 'Message',
            'Parser', 'Utils', 'base64MIME',
            # new names
            'base64mime', 'charset', 'encoders', 'errors', 'generator',
            'header', 'iterators', 'message', 'message_from_file',
            'message_from_string', 'mime', 'parser',
            'quopriMIME', 'quoprimime', 'utils',
            ])
项目:wizard    作者:honor100    | 项目源码 | 文件源码
def _send(self, strTitle, strContent, listToAddr):
        msg = MIMEText(strContent, 'plain', 'utf-8')
        # ?????:

        msg['From'] = self._format_addr(u'DB?? <%s>' % self.MAIL_FROM_ADDR)
        #msg['To'] = self._format_addr(listToAddr)
        msg['To'] = ','.join(listToAddr)
        msg['Subject'] = Header(strTitle, "utf-8").encode()

        server = smtplib.SMTP_SSL(self.MAIL_SMTP_SERVER, self.MAIL_SMTP_PORT)
        #server.set_debuglevel(1)

        #????????????????SMTP server
        if self.MAIL_FROM_PASSWORD != '':
            server.login(self.MAIL_FROM_ADDR, self.MAIL_FROM_PASSWORD)
        sendResult = server.sendmail(self.MAIL_FROM_ADDR, listToAddr, msg.as_string())
        server.quit()

    #??????????????????????????????????????wizard???
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def sanitize_address(addr, encoding='utf-8'):
    if isinstance(addr, string_types):
        addr = parseaddr(force_text(addr))
    nm, addr = addr

    try:
        nm = Header(nm, encoding).encode()
    except UnicodeEncodeError:
        nm = Header(nm, 'utf-8').encode()
    try:
        addr.encode('ascii')
    except UnicodeEncodeError:  # IDN
        if '@' in addr:
            localpart, domain = addr.split('@', 1)
            localpart = str(Header(localpart, encoding))
            domain = domain.encode('idna').decode('ascii')
            addr = '@'.join([localpart, domain])
        else:
            addr = Header(addr, encoding).encode()
    return formataddr((nm, addr))
项目:fanclley    作者:guerbai    | 项目源码 | 文件源码
def book_msg(to_addr, book):
    # TODO: Give a subject.
    subject = ''
    mobi_workshop_dir = '../../mobi_workshop/'
    bookname = book+'.mobi'
    msg = MIMEMultipart()
    msg['From'] = mail_config['from_addr']
    msg['To'] = to_addr
    msg['Subject'] = Header(subject, 'utf-8').encode()
    msg.attach(MIMEText('send with file...', 'plain', 'utf-8'))
    # TODO: check if target is 0kb, if so, send_alert.
    with open(mobi_workshop_dir+book+'.mobi', 'rb') as f:
        mime = MIMEBase('*', '*/*', filename=bookname)
        mime.add_header('Content-Disposition', 'attachment', filename=bookname)
        mime.add_header('Content-ID', '<0>')
        mime.add_header('X-Attachment-Id', '0')
        mime.set_payload(f.read())
        encoders.encode_base64(mime)
        msg.attach(mime)
    return msg
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __init__(self, outfp, mangle_from_=True, maxheaderlen=78):
        """Create the generator for message flattening.

        outfp is the output file-like object for writing the message to.  It
        must have a write() method.

        Optional mangle_from_ is a flag that, when True (the default), escapes
        From_ lines in the body of the message by putting a `>' in front of
        them.

        Optional maxheaderlen specifies the longest length for a non-continued
        header.  When a header line is longer (in characters, with tabs
        expanded to 8 spaces) than maxheaderlen, the header will split as
        defined in the Header class.  Set maxheaderlen to zero to disable
        header wrapping.  The default is 78, as recommended (but not required)
        by RFC 2822.
        """
        self._fp = outfp
        self._mangle_from_ = mangle_from_
        self._maxheaderlen = maxheaderlen
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _write_headers(self, msg):
        for h, v in msg.items():
            self.write('%s: ' % h)
            if isinstance(v, Header):
                self.write(v.encode(
                    maxlinelen=self._maxheaderlen, linesep=self._NL)+self._NL)
            else:
                # Header's got lots of smarts, so use it.
                header = Header(v, maxlinelen=self._maxheaderlen,
                                header_name=h)
                self.write(header.encode(linesep=self._NL)+self._NL)
        # A blank line always separates headers from body
        self.write(self._NL)

    #
    # Handlers for writing types and subtypes
    #
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _write_headers(self, msg):
        # This is almost the same as the string version, except for handling
        # strings with 8bit bytes.
        for h, v in msg._headers:
            self.write('%s: ' % h)
            if isinstance(v, Header):
                self.write(v.encode(maxlinelen=self._maxheaderlen)+NL)
            elif _has_surrogates(v):
                # If we have raw 8bit data in a byte string, we have no idea
                # what the encoding is.  There is no safe way to split this
                # string.  If it's ascii-subset, then we could do a normal
                # ascii split, but if it's multibyte then we could break the
                # string.  There's no way to know so the least harm seems to
                # be to not split the string and risk it being too long.
                self.write(v+NL)
            else:
                # Header's got lots of smarts and this string is safe...
                header = Header(v, maxlinelen=self._maxheaderlen,
                                header_name=h)
                self.write(header.encode(linesep=self._NL)+self._NL)
        # A blank line always separates headers from body
        self.write(self._NL)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_another_long_almost_unsplittable_header(self):
        eq = self.ndiffAssertEqual
        hstr = """\
bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text"""
        h = Header(hstr, continuation_ws='\t')
        eq(h.encode(), """\
bug demonstration
\t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
\tmore text""")
        h = Header(hstr.replace('\t', ' '))
        eq(h.encode(), """\
bug demonstration
 12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789
 more text""")
项目:docklet    作者:unias    | 项目源码 | 文件源码
def send_beans_email(to_address, username, beans):
    email_from_address = settings.get('EMAIL_FROM_ADDRESS')
    if (email_from_address in ['\'\'', '\"\"', '']):
        return
    #text = 'Dear '+ username + ':\n' + '  Your beans in docklet are less than' + beans + '.'
    text = '<html><h4>Dear '+ username + ':</h4>'
    text += '''<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Your beans in <a href='%s'>docklet</a> are %d now. </p>
               <p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;If your beans are less than or equal to 0, all your worksapces will be stopped.</p>
               <p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Please apply for more beans to keep your workspaces running by following link:</p>
               <p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href='%s/beans/application/'>%s/beans/application/</p>
               <br>
               <p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Note: DO NOT reply to this email!</p>
               <br><br>
               <p> <a href='http://docklet.unias.org'>Docklet Team</a>, SEI, PKU</p>
            ''' % (env.getenv("PORTAL_URL"), beans, env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL"))
    text += '<p>'+  str(datetime.datetime.now()) + '</p>'
    text += '</html>'
    subject = 'Docklet beans alert'
    msg = MIMEMultipart()
    textmsg = MIMEText(text,'html','utf-8')
    msg['Subject'] = Header(subject, 'utf-8')
    msg['From'] = email_from_address
    msg['To'] = to_address
    msg.attach(textmsg)
    s = smtplib.SMTP()
    s.connect()
    s.sendmail(email_from_address, to_address, msg.as_string())
    s.close()

# a class that will deal with users' requests about beans application.
项目:docklet    作者:unias    | 项目源码 | 文件源码
def send_activated_email(to_address, username):
    email_from_address = settings.get('EMAIL_FROM_ADDRESS')
    if (email_from_address in ['\'\'', '\"\"', '']):
        return
    #text = 'Dear '+ username + ':\n' + '  Your account in docklet has been activated'
    text = '<html><h4>Dear '+ username + ':</h4>'
    text += '''<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Your account in <a href='%s'>%s</a> has been activated</p>
               <p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Enjoy your personal workspace in the cloud !</p>
               <br>
               <p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Note: DO NOT reply to this email!</p>
               <br><br>
               <p> <a href='http://docklet.unias.org'>Docklet Team</a>, SEI, PKU</p>
            ''' % (env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL"))
    text += '<p>'+  str(datetime.now()) + '</p>'
    text += '</html>'
    subject = 'Docklet account activated'
    msg = MIMEMultipart()
    textmsg = MIMEText(text,'html','utf-8')
    msg['Subject'] = Header(subject, 'utf-8')
    msg['From'] = email_from_address
    msg['To'] = to_address
    msg.attach(textmsg)
    s = smtplib.SMTP()
    s.connect()
    s.sendmail(email_from_address, to_address, msg.as_string())
    s.close()
项目:docklet    作者:unias    | 项目源码 | 文件源码
def send_remind_activating_email(username):
    #admin_email_address = env.getenv('ADMIN_EMAIL_ADDRESS')
    nulladdr = ['\'\'', '\"\"', '']
    email_from_address = settings.get('EMAIL_FROM_ADDRESS')
    admin_email_address = settings.get('ADMIN_EMAIL_ADDRESS')
    if (email_from_address in nulladdr or admin_email_address in nulladdr):
        return
    #text = 'Dear '+ username + ':\n' + '  Your account in docklet has been activated'
    text = '<html><h4>Dear '+ 'admin' + ':</h4>'
    text += '''<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;An activating request for %s in <a href='%s'>%s</a> has been sent</p>
               <p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Please check it !</p>
               <br/><br/>
               <p> Docklet Team, SEI, PKU</p>
            ''' % (username, env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL"))
    text += '<p>'+  str(datetime.utcnow()) + '</p>'
    text += '</html>'
    subject = 'An activating request in Docklet has been sent'
    if admin_email_address[0] == '"':
        admins_addr = admin_email_address[1:-1].split(" ")
    else:
        admins_addr = admin_email_address.split(" ")
    alladdr=""
    for addr in admins_addr:
        alladdr = alladdr+addr+", "
    alladdr=alladdr[:-2]
    msg = MIMEMultipart()
    textmsg = MIMEText(text,'html','utf-8')
    msg['Subject'] = Header(subject, 'utf-8')
    msg['From'] = email_from_address
    msg['To'] = alladdr
    msg.attach(textmsg)
    s = smtplib.SMTP()
    s.connect()
    try:
        s.sendmail(email_from_address, admins_addr, msg.as_string())
    except Exception as e:
        logger.error(e)
    s.close()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _write_headers(self, msg):
        for h, v in msg.items():
            print >> self._fp, '%s:' % h,
            if self._maxheaderlen == 0:
                # Explicit no-wrapping
                print >> self._fp, v
            elif isinstance(v, Header):
                # Header instances know what to do
                print >> self._fp, v.encode()
            elif _is8bitstring(v):
                # If we have raw 8bit data in a byte string, we have no idea
                # what the encoding is.  There is no safe way to split this
                # string.  If it's ascii-subset, then we could do a normal
                # ascii split, but if it's multibyte then we could break the
                # string.  There's no way to know so the least harm seems to
                # be to not split the string and risk it being too long.
                print >> self._fp, v
            else:
                # Header's got lots of smarts, so use it.  Note that this is
                # fundamentally broken though because we lose idempotency when
                # the header string is continued with tabs.  It will now be
                # continued with spaces.  This was reversedly broken before we
                # fixed bug 1974.  Either way, we lose.
                print >> self._fp, Header(
                    v, maxlinelen=self._maxheaderlen, header_name=h).encode()
        # A blank line always separates headers from body
        print >> self._fp

    #
    # Handlers for writing types and subtypes
    #