我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用email.header.Header()。
def __init__(self, outfp, mangle_from_=True, maxheaderlen=78): """Create the generator for message flattening. outfp is the output file-like object for writing the message to. It must have a write() method. Optional mangle_from_ is a flag that, when True (the default), escapes From_ lines in the body of the message by putting a `>' in front of them. Optional maxheaderlen specifies the longest length for a non-continued header. When a header line is longer (in characters, with tabs expanded to 8 spaces) than maxheaderlen, the header will split as defined in the Header class. Set maxheaderlen to zero to disable header wrapping. The default is 78, as recommended (but not required) by RFC 2822. """ self._fp = outfp self._mangle_from_ = mangle_from_ self._maxheaderlen = maxheaderlen
def test_get_param(self): eq = self.assertEqual msg = email.message_from_string( "X-Header: foo=one; bar=two; baz=three\n") eq(msg.get_param('bar', header='x-header'), 'two') eq(msg.get_param('quuz', header='x-header'), None) eq(msg.get_param('quuz'), None) msg = email.message_from_string( 'X-Header: foo; bar="one"; baz=two\n') eq(msg.get_param('foo', header='x-header'), '') eq(msg.get_param('bar', header='x-header'), 'one') eq(msg.get_param('baz', header='x-header'), 'two') # XXX: We are not RFC-2045 compliant! We cannot parse: # msg["Content-Type"] = 'text/plain; weird="hey; dolly? [you] @ <\\"home\\">?"' # msg.get_param("weird") # yet.
def test__all__(self): module = __import__('email') # Can't use sorted() here due to Python 2.3 compatibility all = module.__all__[:] all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def sanitize_address(addr, encoding='utf-8'): if isinstance(addr, string_types): addr = parseaddr(force_text(addr)) nm, addr = addr try: nm = Header(nm, encoding).encode() except UnicodeEncodeError: nm = Header(nm, 'utf-8').encode() try: addr.encode('ascii') except UnicodeEncodeError: # IDN if '@' in addr: localpart, domain = addr.split('@', 1) localpart = str(Header(localpart, encoding)) domain = domain.encode('idna').decode('ascii') addr = '@'.join([localpart, domain]) else: addr = Header(addr, encoding).encode() return formataddr((nm, addr))
def parse_attachment(self, message_part): content_disposition = message_part.get("Content-Disposition", None) if content_disposition: dispositions = content_disposition.strip().split(";") if bool(content_disposition and dispositions[0].lower() == "attachment"): file_data = message_part.get_payload(decode=True) attachment = {} attachment["content_type"] = message_part.get_content_type() attachment["size"] = len(file_data) deName = email.Header.decode_header(message_part.get_filename())[0] name = deName[0] if deName[1] != None: name = unicode(deName[0], deName[1]) print name attachment["name"] = name attachment["data"] = file_data '''???? fileobject = open(name, "wb") fileobject.write(file_data) fileobject.close() ''' return attachment return None
def forbid_multi_line_headers(name, val, encoding): """Forbids multi-line headers, to prevent header injection.""" encoding = encoding or settings.DEFAULT_CHARSET val = force_text(val) if '\n' in val or '\r' in val: raise BadHeaderError("Header values can't contain newlines (got %r for header %r)" % (val, name)) try: val.encode('ascii') except UnicodeEncodeError: if name.lower() in ADDRESS_HEADERS: val = ', '.join(sanitize_address(addr, encoding) for addr in getaddresses((val,))) else: val = Header(val, encoding).encode() else: if name.lower() == 'subject': val = Header(val).encode() return str(name), val
def sanitize_address(addr, encoding): if not isinstance(addr, tuple): addr = parseaddr(force_text(addr)) nm, addr = addr nm = Header(nm, encoding).encode() try: addr.encode('ascii') except UnicodeEncodeError: # IDN if '@' in addr: localpart, domain = addr.split('@', 1) localpart = str(Header(localpart, encoding)) domain = domain.encode('idna').decode('ascii') addr = '@'.join([localpart, domain]) else: addr = Header(addr, encoding).encode() return formataddr((nm, addr))
def _bind_write_headers(msg): from email.header import Header def _write_headers(self): # Self refers to the Generator object for h, v in msg.items(): print('%s:' % h, end=' ', file=self._fp) if isinstance(v, Header): print(v.encode(maxlinelen=self._maxheaderlen), file=self._fp) else: # Header's got lots of smarts, so use it. header = Header(v, maxlinelen=self._maxheaderlen, charset='utf-8', header_name=h) print(header.encode(), file=self._fp) # A blank line always separates headers from body print(file=self._fp) return _write_headers
def send_message(self, email_content): # ??????? now = datetime.datetime.now() header = self.smtp_email_header + '[' + str(now.month) + '-' + str(now.day) + ' ' + \ str(now.hour) + ':' + str(now.minute) + ':' + str(now.second) + ']' msg = MIMEText(email_content, 'plain', 'utf-8') msg['from'] = self.smtp_from_addr msg['to'] = self.smtp_to_addr msg['Subject'] = Header(header, 'utf-8').encode() # ?? try: smtp_server = smtplib.SMTP(self.smtp_server_host, self.smtp_server_port) smtp_server.login(self.smtp_from_addr, self.smtp_server_password) smtp_server.sendmail(self.smtp_from_addr, [self.smtp_to_addr], msg.as_string()) smtp_server.quit() except Exception as e: if log.isEnabledFor(logging.ERROR): log.error("??????") log.exception(e)
def send_txt(self, filename): # ???????????????????????????????????? self.smtp = smtplib.SMTP_SSL(port=465) self.smtp.connect(self.server) self.smtp.login(self.username, self.password) #self.msg = MIMEMultipart() self.msg = MIMEText("content", 'plain', 'utf-8') self.msg['to'] = self.to_mail self.msg['from'] = self.from_mail self.msg['Subject'] = "459" self.filename = filename + ".txt" self.msg['Date'] = Utils.formatdate(localtime=1) content = open(self.filename.decode('utf-8'), 'rb').read() # print content #self.att = MIMEText(content, 'base64', 'utf-8') #self.att['Content-Type'] = 'application/octet-stream' # self.att["Content-Disposition"] = "attachment;filename=\"%s\"" %(self.filename.encode('gb2312')) #self.att["Content-Disposition"] = "attachment;filename=\"%s\"" % Header(self.filename, 'gb2312') # print self.att["Content-Disposition"] #self.msg.attach(self.att) #self.smtp.sendmail(self.msg['from'], self.msg['to'], self.msg.as_string()) self.smtp.sendmail(self.msg['from'], self.msg['to'], self.msg.as_string()) self.smtp.quit()
def send_139(): cfg = Toolkit.getUserData('data.cfg') sender = cfg['mail_user'] passwd = cfg['mail_pass'] receiver = cfg['receiver'] msg = MIMEText('Python mail test', 'plain', 'utf-8') msg['From'] = Header('FromTest', 'utf-8') msg['To'] = Header('ToTest', 'utf-8') subject = 'Python SMTP Test' msg['Subject'] = Header(subject, 'utf-8') try: obj = smtplib.SMTP() obj.connect('smtp.126.com', 25) obj.login(sender, passwd) obj.sendmail(sender, receiver, msg.as_string()) except smtplib.SMTPException, e: print e
def send_mail(subject, from_addr, to_addr, content): """ ??????????????? ??? ???? ???? ?? ???????????????? """ password = os.getenv('MAIL_PASSWORD') msg = MIMEText(content, 'plain', 'utf-8') msg['Subject'] = Header(u'%s', 'utf-8').encode() % subject msg['From'] = _format_addr(u'lvhuiyang <%s>' % from_addr) msg['To'] = _format_addr(u'service of python_smtp <%s>' % to_addr) smtp_server = 'smtp.qq.com' smtp_port = 587 server = smtplib.SMTP(smtp_server, smtp_port) server.starttls() server.set_debuglevel(1) server.login(from_addr, password) server.sendmail(from_addr, [to_addr], msg.as_string()) server.quit()
def send_email(tos, subject, body, smtp_config): msg = MIMEText(body.encode('utf8'), 'plain', 'utf-8') msg['To'] = email.utils.formataddr(('Recipient', ','.join(tos))) msg['From'] = email.utils.formataddr(('Author', smtp_config['from'])) msg['Subject'] = Header(subject, 'utf8') server = smtplib.SMTP(smtp_config['host'], smtp_config.get('port', 25)) try: server.ehlo() if server.has_extn('STARTTLS'): server.starttls() server.ehlo() username = smtp_config.get('username') if username is not None: server.login(username, smtp_config['password']) server.sendmail(smtp_config['from'], tos, msg.as_string()) logger.debug("Mail sent") except Exception: logger.exception("Could not send the email") finally: server.quit()
def _send(self, strTitle, strContent, listToAddr): msg = MIMEText(strContent, 'plain', 'utf-8') # ?????: msg['From'] = self._format_addr(u'DB?? <%s>' % self.MAIL_FROM_ADDR) #msg['To'] = self._format_addr(listToAddr) msg['To'] = ','.join(listToAddr) msg['Subject'] = Header(strTitle, "utf-8").encode() server = smtplib.SMTP_SSL(self.MAIL_SMTP_SERVER, self.MAIL_SMTP_PORT) #server.set_debuglevel(1) #????????????????SMTP server if self.MAIL_FROM_PASSWORD != '': server.login(self.MAIL_FROM_ADDR, self.MAIL_FROM_PASSWORD) sendResult = server.sendmail(self.MAIL_FROM_ADDR, listToAddr, msg.as_string()) server.quit() #??????????????????????????????????????wizard???
def book_msg(to_addr, book): # TODO: Give a subject. subject = '' mobi_workshop_dir = '../../mobi_workshop/' bookname = book+'.mobi' msg = MIMEMultipart() msg['From'] = mail_config['from_addr'] msg['To'] = to_addr msg['Subject'] = Header(subject, 'utf-8').encode() msg.attach(MIMEText('send with file...', 'plain', 'utf-8')) # TODO: check if target is 0kb, if so, send_alert. with open(mobi_workshop_dir+book+'.mobi', 'rb') as f: mime = MIMEBase('*', '*/*', filename=bookname) mime.add_header('Content-Disposition', 'attachment', filename=bookname) mime.add_header('Content-ID', '<0>') mime.add_header('X-Attachment-Id', '0') mime.set_payload(f.read()) encoders.encode_base64(mime) msg.attach(mime) return msg
def _write_headers(self, msg): for h, v in msg.items(): self.write('%s: ' % h) if isinstance(v, Header): self.write(v.encode( maxlinelen=self._maxheaderlen, linesep=self._NL)+self._NL) else: # Header's got lots of smarts, so use it. header = Header(v, maxlinelen=self._maxheaderlen, header_name=h) self.write(header.encode(linesep=self._NL)+self._NL) # A blank line always separates headers from body self.write(self._NL) # # Handlers for writing types and subtypes #
def _write_headers(self, msg): # This is almost the same as the string version, except for handling # strings with 8bit bytes. for h, v in msg._headers: self.write('%s: ' % h) if isinstance(v, Header): self.write(v.encode(maxlinelen=self._maxheaderlen)+NL) elif _has_surrogates(v): # If we have raw 8bit data in a byte string, we have no idea # what the encoding is. There is no safe way to split this # string. If it's ascii-subset, then we could do a normal # ascii split, but if it's multibyte then we could break the # string. There's no way to know so the least harm seems to # be to not split the string and risk it being too long. self.write(v+NL) else: # Header's got lots of smarts and this string is safe... header = Header(v, maxlinelen=self._maxheaderlen, header_name=h) self.write(header.encode(linesep=self._NL)+self._NL) # A blank line always separates headers from body self.write(self._NL)
def test_another_long_almost_unsplittable_header(self): eq = self.ndiffAssertEqual hstr = """\ bug demonstration \t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789 \tmore text""" h = Header(hstr, continuation_ws='\t') eq(h.encode(), """\ bug demonstration \t12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789 \tmore text""") h = Header(hstr.replace('\t', ' ')) eq(h.encode(), """\ bug demonstration 12345678911234567892123456789312345678941234567895123456789612345678971234567898112345678911234567892123456789112345678911234567892123456789 more text""")
def send_beans_email(to_address, username, beans): email_from_address = settings.get('EMAIL_FROM_ADDRESS') if (email_from_address in ['\'\'', '\"\"', '']): return #text = 'Dear '+ username + ':\n' + ' Your beans in docklet are less than' + beans + '.' text = '<html><h4>Dear '+ username + ':</h4>' text += '''<p> Your beans in <a href='%s'>docklet</a> are %d now. </p> <p> If your beans are less than or equal to 0, all your worksapces will be stopped.</p> <p> Please apply for more beans to keep your workspaces running by following link:</p> <p> <a href='%s/beans/application/'>%s/beans/application/</p> <br> <p> Note: DO NOT reply to this email!</p> <br><br> <p> <a href='http://docklet.unias.org'>Docklet Team</a>, SEI, PKU</p> ''' % (env.getenv("PORTAL_URL"), beans, env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL")) text += '<p>'+ str(datetime.datetime.now()) + '</p>' text += '</html>' subject = 'Docklet beans alert' msg = MIMEMultipart() textmsg = MIMEText(text,'html','utf-8') msg['Subject'] = Header(subject, 'utf-8') msg['From'] = email_from_address msg['To'] = to_address msg.attach(textmsg) s = smtplib.SMTP() s.connect() s.sendmail(email_from_address, to_address, msg.as_string()) s.close() # a class that will deal with users' requests about beans application.
def send_activated_email(to_address, username): email_from_address = settings.get('EMAIL_FROM_ADDRESS') if (email_from_address in ['\'\'', '\"\"', '']): return #text = 'Dear '+ username + ':\n' + ' Your account in docklet has been activated' text = '<html><h4>Dear '+ username + ':</h4>' text += '''<p> Your account in <a href='%s'>%s</a> has been activated</p> <p> Enjoy your personal workspace in the cloud !</p> <br> <p> Note: DO NOT reply to this email!</p> <br><br> <p> <a href='http://docklet.unias.org'>Docklet Team</a>, SEI, PKU</p> ''' % (env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL")) text += '<p>'+ str(datetime.now()) + '</p>' text += '</html>' subject = 'Docklet account activated' msg = MIMEMultipart() textmsg = MIMEText(text,'html','utf-8') msg['Subject'] = Header(subject, 'utf-8') msg['From'] = email_from_address msg['To'] = to_address msg.attach(textmsg) s = smtplib.SMTP() s.connect() s.sendmail(email_from_address, to_address, msg.as_string()) s.close()
def send_remind_activating_email(username): #admin_email_address = env.getenv('ADMIN_EMAIL_ADDRESS') nulladdr = ['\'\'', '\"\"', ''] email_from_address = settings.get('EMAIL_FROM_ADDRESS') admin_email_address = settings.get('ADMIN_EMAIL_ADDRESS') if (email_from_address in nulladdr or admin_email_address in nulladdr): return #text = 'Dear '+ username + ':\n' + ' Your account in docklet has been activated' text = '<html><h4>Dear '+ 'admin' + ':</h4>' text += '''<p> An activating request for %s in <a href='%s'>%s</a> has been sent</p> <p> Please check it !</p> <br/><br/> <p> Docklet Team, SEI, PKU</p> ''' % (username, env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL")) text += '<p>'+ str(datetime.utcnow()) + '</p>' text += '</html>' subject = 'An activating request in Docklet has been sent' if admin_email_address[0] == '"': admins_addr = admin_email_address[1:-1].split(" ") else: admins_addr = admin_email_address.split(" ") alladdr="" for addr in admins_addr: alladdr = alladdr+addr+", " alladdr=alladdr[:-2] msg = MIMEMultipart() textmsg = MIMEText(text,'html','utf-8') msg['Subject'] = Header(subject, 'utf-8') msg['From'] = email_from_address msg['To'] = alladdr msg.attach(textmsg) s = smtplib.SMTP() s.connect() try: s.sendmail(email_from_address, admins_addr, msg.as_string()) except Exception as e: logger.error(e) s.close()
def _write_headers(self, msg): for h, v in msg.items(): print >> self._fp, '%s:' % h, if self._maxheaderlen == 0: # Explicit no-wrapping print >> self._fp, v elif isinstance(v, Header): # Header instances know what to do print >> self._fp, v.encode() elif _is8bitstring(v): # If we have raw 8bit data in a byte string, we have no idea # what the encoding is. There is no safe way to split this # string. If it's ascii-subset, then we could do a normal # ascii split, but if it's multibyte then we could break the # string. There's no way to know so the least harm seems to # be to not split the string and risk it being too long. print >> self._fp, v else: # Header's got lots of smarts, so use it. Note that this is # fundamentally broken though because we lose idempotency when # the header string is continued with tabs. It will now be # continued with spaces. This was reversedly broken before we # fixed bug 1974. Either way, we lose. print >> self._fp, Header( v, maxlinelen=self._maxheaderlen, header_name=h).encode() # A blank line always separates headers from body print >> self._fp # # Handlers for writing types and subtypes #