Python email 模块,utils() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用email.utils()

项目:stgit    作者:ctmarinas    | 项目源码 | 文件源码
def __get_sender():
    """Return the 'authname <authemail>' string as read from the
    configuration file
    """
    sender = config.get('stgit.sender')
    if not sender:
        try:
            sender = git.user()
        except git.GitException:
            try:
                sender = git.author()
            except git.GitException:
                pass
    if not sender:
        raise CmdException('Unknown sender name and e-mail; you should for '
                           'example set git config user.name and user.email')
    sender = email.utils.parseaddr(sender)

    return email.utils.formataddr(address_or_alias(sender))
项目:stgit    作者:ctmarinas    | 项目源码 | 文件源码
def __build_extra_headers(msg, msg_id, ref_id = None):
    """Build extra email headers and encoding
    """
    del msg['Date']
    msg['Date'] = email.utils.formatdate(localtime = True)
    msg['Message-ID'] = msg_id
    if ref_id:
        # make sure the ref id has the angle brackets
        ref_id = '<%s>' % ref_id.strip(' \t\n<>')
        msg['In-Reply-To'] = ref_id
        msg['References'] = ref_id
    msg['User-Agent'] = 'StGit/%s' % version.version

    # update other address headers
    __update_header(msg, 'Reply-To')
    __update_header(msg, 'Mail-Reply-To')
    __update_header(msg, 'Mail-Followup-To')
项目:xd    作者:century-arcade    | 项目源码 | 文件源码
def generate_email_files(msg):
    counter = 1
    upload_date = time.mktime(email.utils.parsedate(msg["Date"]))
    for part in msg.walk():
        # multipart/* are just containers
        if part.get_content_maintype() == 'multipart':
            continue
        # Applications should really sanitize the given filename so that an
        # email message can't be used to overwrite important files
        filename = part.get_filename()
        if not filename:
            ext = mimetypes.guess_extension(part.get_content_type())
            if not ext:
                # Use a generic bag-of-bits extension
                ext = '.bin'
            filename = 'part-%03d%s' % (counter, ext)
        counter += 1

        data = part.get_payload(decode=True)
        if parse_pathname(filename).ext == '.zip':
            for zipfn, zipdata, zipdt in generate_zip_files(data):
                yield zipfn, zipdata, zipdt
        else:
            yield filename, data, upload_date
项目:concord232    作者:JasonCarter80    | 项目源码 | 文件源码
def _send_system_email(config, subject, recips, body):
    try:
        fromaddr = config.get('email', 'fromaddr')
        smtphost = config.get('email', 'smtphost')
    except (configparser.NoOptionError,
            configparser.NoSectionError):
        raise MissingEmailConfig()

    msg = email.mime.text.MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = fromaddr
    msg['Date'] = email.utils.formatdate()
    msg['Message-Id'] = email.utils.make_msgid('concord232')
    for addr in recips:
        msg['To'] = addr

    smtp = smtplib.SMTP(smtphost)
    smtp.sendmail(fromaddr, recips, msg.as_string())
    smtp.quit()
项目:mailnex    作者:linsam    | 项目源码 | 文件源码
def encodeEmail(fullmail):
    """Encode a full email address.

    This is needed because the email.headers.Header format encodes full
    strings instead of address parts. The result is both the name, box and
    host get wrapped up in a single string instead of separately as mandated
    by the spec. The result of this is some MTA (for example, postfix in
    default configuration) automatically adds an @host to addresses that lack
    the @ sign, and it doesn't do RFC2046 decoding to see that (and by that
    spec it shouldn't have to)

    So, this function encodes the user name part separate and passes the
    actual address (box and host) verbatim."""
    parts = email.utils.parseaddr(fullmail)
    p2 = (str(email.header.Header(parts[0],'utf-8')), parts[1])
    return email.utils.formataddr(p2)

#password = getPassword("smtp", user, host, port)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def open_local_file(self, req):
        import email.utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as msg:
            # users shouldn't expect OSErrors coming from urlopen()
            raise URLError(msg)
        raise URLError('file not on local host')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def open_local_file(self, url):
        """Use local file."""
        import mimetypes, email.utils
        from io import StringIO
        host, file = splithost(url)
        localname = url2pathname(file)
        try:
            stats = os.stat(localname)
        except OSError as e:
            raise URLError(e.errno, e.strerror, e.filename)
        size = stats.st_size
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        mtype = mimetypes.guess_type(url)[0]
        headers = email.message_from_string(
            'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
            (mtype or 'text/plain', size, modified))
        if not host:
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        host, port = splitport(host)
        if (not port
           and socket.gethostbyname(host) in (localhost() + thishost())):
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            elif file[:2] == './':
                raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        raise URLError('local file error', 'not on local host')
项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def send_mail(mail_pk):
    m = Mail.objects.get(pk=mail_pk)
    try:
        # Check if the recipient's domain has a MX record
        validate_existing_mail_domain(m.recipient)
    except ValidationError as exc:
        # No MX record, set status to IGNORED
        log.info('[{}] Ignored recipient {}, MX check failed'.format(
            m.identifier, m.recipient))
        try:
            MailStatus.objects.create(
                mail=m, status=MailStatus.IGNORED,
                creation_date=utc_now(), raw_msg=exc)
        except django_fsm.TransitionNotAllowed as e:
            raise RejectForbiddenTransition(m, e)
        return 'Ignored {}'.format(m.pk)
    else:
        log.info('[{}] Queueing email for recipient {}'.format(
            m.identifier, m.recipient))
        # explicitly gets the backend asking for systematic DSN.
        backend = Backend(
            mailstatus_class_path='munch.apps.campaigns.models.MailStatus',
            build_envelope_task_path=(
                'munch.apps.campaigns.utils.get_envelope'),
            record_status_task_path=(
                'munch.apps.campaigns.utils.record_status'))
        backend.send_message(
            m.identifier, m.recipient, m.get_headers(), attempts=0)
        return 'Sent {}'.format(m.pk)
项目:anki-onenote-importer    作者:tmpethick    | 项目源码 | 文件源码
def _timestamp(self, header_name):
        """
        Returns the timestamp in the specified header.
        """
        header = self.message[header_name]
        if header is None:
            return None
        data = email.utils.parsedate_tz(header)
        if data is None:
            return None
        return email.utils.mktime_tz(data)
项目:stgit    作者:ctmarinas    | 项目源码 | 文件源码
def __addr_list(msg, header):
    return [addr for name, addr in
            email.utils.getaddresses(msg.get_all(header, []))]
项目:stgit    作者:ctmarinas    | 项目源码 | 文件源码
def __send_message(type, tmpl, options, *args):
    """Message sending dispatcher.
    """
    (build, outstr) = {'cover': (__build_cover, 'the cover message'),
                       'patch': (__build_message, 'patch "%s"' % args[0])}[type]
    if type == 'patch':
        (patch_nr, total_nr) = (args[1], args[2])

    msg_id = email.utils.make_msgid('stgit')
    msg = build(tmpl, msg_id, options, *args)

    msg_str = msg.as_string(options.mbox)
    if options.mbox:
        out.stdout_raw(msg_str + '\n')
        return msg_id

    if not options.git:
        from_addr, to_addrs = __parse_addresses(msg)
        out.start('Sending ' + outstr)

    smtpserver = options.smtp_server or config.get('stgit.smtpserver')
    if options.git:
        __send_message_git(msg, options)
    elif smtpserver.startswith('/'):
        # Use the sendmail tool
        __send_message_sendmail(smtpserver, msg_str)
    else:
        # Use the SMTP server (we have host and port information)
        __send_message_smtp(smtpserver, from_addr, to_addrs, msg_str, options)

    # give recipients a chance of receiving related patches in correct order
    if type == 'cover' or (type == 'patch' and patch_nr < total_nr):
        sleep = options.sleep or config.getint('stgit.smtpdelay')
        time.sleep(sleep)
    if not options.git:
        out.done()
    return msg_id
项目:stgit    作者:ctmarinas    | 项目源码 | 文件源码
def __update_header(msg, header, addr = '', ignore = ()):
    addr_pairs = email.utils.getaddresses(msg.get_all(header, []) + [addr])
    del msg[header]
    # remove pairs without an address and resolve the aliases
    addr_pairs = [address_or_alias(name_addr) for name_addr in addr_pairs
                  if name_addr[1]]
    # remove the duplicates and filter the addresses
    addr_pairs = [name_addr for name_addr in addr_pairs
                  if name_addr[1] not in ignore]
    if addr_pairs:
        msg[header] = ', '.join(map(email.utils.formataddr, addr_pairs))
    return set(addr for _, addr in addr_pairs)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def open_local_file(self, req):
        import email.utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as exp:
            # users shouldn't expect OSErrors coming from urlopen()
            raise URLError(exp)
        raise URLError('file not on local host')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def open_local_file(self, url):
        """Use local file."""
        import email.utils
        import mimetypes
        host, file = splithost(url)
        localname = url2pathname(file)
        try:
            stats = os.stat(localname)
        except OSError as e:
            raise URLError(e.strerror, e.filename)
        size = stats.st_size
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        mtype = mimetypes.guess_type(url)[0]
        headers = email.message_from_string(
            'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
            (mtype or 'text/plain', size, modified))
        if not host:
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        host, port = splitport(host)
        if (not port
           and socket.gethostbyname(host) in ((localhost(),) + thishost())):
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            elif file[:2] == './':
                raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        raise URLError('local file error: not on local host')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def open_local_file(self, req):
        import email.utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as exp:
            # users shouldn't expect OSErrors coming from urlopen()
            raise URLError(exp)
        raise URLError('file not on local host')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def open_local_file(self, url):
        """Use local file."""
        import email.utils
        import mimetypes
        host, file = splithost(url)
        localname = url2pathname(file)
        try:
            stats = os.stat(localname)
        except OSError as e:
            raise URLError(e.strerror, e.filename)
        size = stats.st_size
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        mtype = mimetypes.guess_type(url)[0]
        headers = email.message_from_string(
            'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
            (mtype or 'text/plain', size, modified))
        if not host:
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        host, port = splitport(host)
        if (not port
           and socket.gethostbyname(host) in ((localhost(),) + thishost())):
            urlfile = file
            if file[:1] == '/':
                urlfile = 'file://' + file
            elif file[:2] == './':
                raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
            return addinfourl(open(localname, 'rb'), headers, urlfile)
        raise URLError('local file error: not on local host')
项目:anki-onenote-importer    作者:tmpethick    | 项目源码 | 文件源码
def _address(self, header_name):
        """Returns a list with the email addresses in the specified header."""
        def decode(text, encoding):
            """Decode a text. If an exception occurs when decoding returns the
            original text"""
            if encoding is None:
                return text
            try:
                return text.decode(encoding)
            except UnicodeDecodeError:
                return text

        def encode(text):
            """Encode a text to utf-8. If an exception occurs when encoding
            returns the original text"""
            try:
                return text.encode('utf-8')
            except UnicodeEncodeError:
                return text
        header_value = self.message[header_name]
        if not header_value:
            return []
        result = dict()
        header_value = header_value.replace('\n', ' ')
        pieces = email.header.decode_header(header_value)
        pieces = [decode(text, encoding) for text, encoding in pieces]
        header_value = u"".join(pieces).strip()
        name, address = email.utils.parseaddr(header_value)
        while address:
            result[address] = name or None
            index = header_value.find(address) + len(address)
            if index >= len(header_value):
                break
            if header_value[index] == '>':
                index += 1
            if index >= len(header_value):
                break
            if header_value[index] == ',':
                index += 1
            header_value = header_value[index:].strip()
            name, address = email.utils.parseaddr(header_value)
        self.names.update(result)
        addresses = [encode(address) for address in result.keys()]
        return sorted(addresses)