我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用email.utils()。
def __get_sender(): """Return the 'authname <authemail>' string as read from the configuration file """ sender = config.get('stgit.sender') if not sender: try: sender = git.user() except git.GitException: try: sender = git.author() except git.GitException: pass if not sender: raise CmdException('Unknown sender name and e-mail; you should for ' 'example set git config user.name and user.email') sender = email.utils.parseaddr(sender) return email.utils.formataddr(address_or_alias(sender))
def __build_extra_headers(msg, msg_id, ref_id = None): """Build extra email headers and encoding """ del msg['Date'] msg['Date'] = email.utils.formatdate(localtime = True) msg['Message-ID'] = msg_id if ref_id: # make sure the ref id has the angle brackets ref_id = '<%s>' % ref_id.strip(' \t\n<>') msg['In-Reply-To'] = ref_id msg['References'] = ref_id msg['User-Agent'] = 'StGit/%s' % version.version # update other address headers __update_header(msg, 'Reply-To') __update_header(msg, 'Mail-Reply-To') __update_header(msg, 'Mail-Followup-To')
def generate_email_files(msg): counter = 1 upload_date = time.mktime(email.utils.parsedate(msg["Date"])) for part in msg.walk(): # multipart/* are just containers if part.get_content_maintype() == 'multipart': continue # Applications should really sanitize the given filename so that an # email message can't be used to overwrite important files filename = part.get_filename() if not filename: ext = mimetypes.guess_extension(part.get_content_type()) if not ext: # Use a generic bag-of-bits extension ext = '.bin' filename = 'part-%03d%s' % (counter, ext) counter += 1 data = part.get_payload(decode=True) if parse_pathname(filename).ext == '.zip': for zipfn, zipdata, zipdt in generate_zip_files(data): yield zipfn, zipdata, zipdt else: yield filename, data, upload_date
def _send_system_email(config, subject, recips, body): try: fromaddr = config.get('email', 'fromaddr') smtphost = config.get('email', 'smtphost') except (configparser.NoOptionError, configparser.NoSectionError): raise MissingEmailConfig() msg = email.mime.text.MIMEText(body) msg['Subject'] = subject msg['From'] = fromaddr msg['Date'] = email.utils.formatdate() msg['Message-Id'] = email.utils.make_msgid('concord232') for addr in recips: msg['To'] = addr smtp = smtplib.SMTP(smtphost) smtp.sendmail(fromaddr, recips, msg.as_string()) smtp.quit()
def encodeEmail(fullmail): """Encode a full email address. This is needed because the email.headers.Header format encodes full strings instead of address parts. The result is both the name, box and host get wrapped up in a single string instead of separately as mandated by the spec. The result of this is some MTA (for example, postfix in default configuration) automatically adds an @host to addresses that lack the @ sign, and it doesn't do RFC2046 decoding to see that (and by that spec it shouldn't have to) So, this function encodes the user name part separate and passes the actual address (box and host) verbatim.""" parts = email.utils.parseaddr(fullmail) p2 = (str(email.header.Header(parts[0],'utf-8')), parts[1]) return email.utils.formataddr(p2) #password = getPassword("smtp", user, host, port)
def open_local_file(self, req): import email.utils import mimetypes host = req.host filename = req.selector localfile = url2pathname(filename) try: stats = os.stat(localfile) size = stats.st_size modified = email.utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(filename)[0] headers = email.message_from_string( 'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified)) if host: host, port = splitport(host) if not host or \ (not port and _safe_gethostbyname(host) in self.get_names()): if host: origurl = 'file://' + host + filename else: origurl = 'file://' + filename return addinfourl(open(localfile, 'rb'), headers, origurl) except OSError as msg: # users shouldn't expect OSErrors coming from urlopen() raise URLError(msg) raise URLError('file not on local host')
def open_local_file(self, url): """Use local file.""" import mimetypes, email.utils from io import StringIO host, file = splithost(url) localname = url2pathname(file) try: stats = os.stat(localname) except OSError as e: raise URLError(e.errno, e.strerror, e.filename) size = stats.st_size modified = email.utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(url)[0] headers = email.message_from_string( 'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified)) if not host: urlfile = file if file[:1] == '/': urlfile = 'file://' + file return addinfourl(open(localname, 'rb'), headers, urlfile) host, port = splitport(host) if (not port and socket.gethostbyname(host) in (localhost() + thishost())): urlfile = file if file[:1] == '/': urlfile = 'file://' + file elif file[:2] == './': raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url) return addinfourl(open(localname, 'rb'), headers, urlfile) raise URLError('local file error', 'not on local host')
def send_mail(mail_pk): m = Mail.objects.get(pk=mail_pk) try: # Check if the recipient's domain has a MX record validate_existing_mail_domain(m.recipient) except ValidationError as exc: # No MX record, set status to IGNORED log.info('[{}] Ignored recipient {}, MX check failed'.format( m.identifier, m.recipient)) try: MailStatus.objects.create( mail=m, status=MailStatus.IGNORED, creation_date=utc_now(), raw_msg=exc) except django_fsm.TransitionNotAllowed as e: raise RejectForbiddenTransition(m, e) return 'Ignored {}'.format(m.pk) else: log.info('[{}] Queueing email for recipient {}'.format( m.identifier, m.recipient)) # explicitly gets the backend asking for systematic DSN. backend = Backend( mailstatus_class_path='munch.apps.campaigns.models.MailStatus', build_envelope_task_path=( 'munch.apps.campaigns.utils.get_envelope'), record_status_task_path=( 'munch.apps.campaigns.utils.record_status')) backend.send_message( m.identifier, m.recipient, m.get_headers(), attempts=0) return 'Sent {}'.format(m.pk)
def _timestamp(self, header_name): """ Returns the timestamp in the specified header. """ header = self.message[header_name] if header is None: return None data = email.utils.parsedate_tz(header) if data is None: return None return email.utils.mktime_tz(data)
def __addr_list(msg, header): return [addr for name, addr in email.utils.getaddresses(msg.get_all(header, []))]
def __send_message(type, tmpl, options, *args): """Message sending dispatcher. """ (build, outstr) = {'cover': (__build_cover, 'the cover message'), 'patch': (__build_message, 'patch "%s"' % args[0])}[type] if type == 'patch': (patch_nr, total_nr) = (args[1], args[2]) msg_id = email.utils.make_msgid('stgit') msg = build(tmpl, msg_id, options, *args) msg_str = msg.as_string(options.mbox) if options.mbox: out.stdout_raw(msg_str + '\n') return msg_id if not options.git: from_addr, to_addrs = __parse_addresses(msg) out.start('Sending ' + outstr) smtpserver = options.smtp_server or config.get('stgit.smtpserver') if options.git: __send_message_git(msg, options) elif smtpserver.startswith('/'): # Use the sendmail tool __send_message_sendmail(smtpserver, msg_str) else: # Use the SMTP server (we have host and port information) __send_message_smtp(smtpserver, from_addr, to_addrs, msg_str, options) # give recipients a chance of receiving related patches in correct order if type == 'cover' or (type == 'patch' and patch_nr < total_nr): sleep = options.sleep or config.getint('stgit.smtpdelay') time.sleep(sleep) if not options.git: out.done() return msg_id
def __update_header(msg, header, addr = '', ignore = ()): addr_pairs = email.utils.getaddresses(msg.get_all(header, []) + [addr]) del msg[header] # remove pairs without an address and resolve the aliases addr_pairs = [address_or_alias(name_addr) for name_addr in addr_pairs if name_addr[1]] # remove the duplicates and filter the addresses addr_pairs = [name_addr for name_addr in addr_pairs if name_addr[1] not in ignore] if addr_pairs: msg[header] = ', '.join(map(email.utils.formataddr, addr_pairs)) return set(addr for _, addr in addr_pairs)
def open_local_file(self, req): import email.utils import mimetypes host = req.host filename = req.selector localfile = url2pathname(filename) try: stats = os.stat(localfile) size = stats.st_size modified = email.utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(filename)[0] headers = email.message_from_string( 'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified)) if host: host, port = splitport(host) if not host or \ (not port and _safe_gethostbyname(host) in self.get_names()): if host: origurl = 'file://' + host + filename else: origurl = 'file://' + filename return addinfourl(open(localfile, 'rb'), headers, origurl) except OSError as exp: # users shouldn't expect OSErrors coming from urlopen() raise URLError(exp) raise URLError('file not on local host')
def open_local_file(self, url): """Use local file.""" import email.utils import mimetypes host, file = splithost(url) localname = url2pathname(file) try: stats = os.stat(localname) except OSError as e: raise URLError(e.strerror, e.filename) size = stats.st_size modified = email.utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(url)[0] headers = email.message_from_string( 'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified)) if not host: urlfile = file if file[:1] == '/': urlfile = 'file://' + file return addinfourl(open(localname, 'rb'), headers, urlfile) host, port = splitport(host) if (not port and socket.gethostbyname(host) in ((localhost(),) + thishost())): urlfile = file if file[:1] == '/': urlfile = 'file://' + file elif file[:2] == './': raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url) return addinfourl(open(localname, 'rb'), headers, urlfile) raise URLError('local file error: not on local host')
def _address(self, header_name): """Returns a list with the email addresses in the specified header.""" def decode(text, encoding): """Decode a text. If an exception occurs when decoding returns the original text""" if encoding is None: return text try: return text.decode(encoding) except UnicodeDecodeError: return text def encode(text): """Encode a text to utf-8. If an exception occurs when encoding returns the original text""" try: return text.encode('utf-8') except UnicodeEncodeError: return text header_value = self.message[header_name] if not header_value: return [] result = dict() header_value = header_value.replace('\n', ' ') pieces = email.header.decode_header(header_value) pieces = [decode(text, encoding) for text, encoding in pieces] header_value = u"".join(pieces).strip() name, address = email.utils.parseaddr(header_value) while address: result[address] = name or None index = header_value.find(address) + len(address) if index >= len(header_value): break if header_value[index] == '>': index += 1 if index >= len(header_value): break if header_value[index] == ',': index += 1 header_value = header_value[index:].strip() name, address = email.utils.parseaddr(header_value) self.names.update(result) addresses = [encode(address) for address in result.keys()] return sorted(addresses)