我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用email.message.EmailMessage()。
def add(self, location: str, content_type: str, payload: str, encoding: str = 'quoted-printable') -> None: resource = EmailMessage() if content_type == 'text/html': resource.add_header('Content-Type', 'text/html', charset='utf-8') else: resource['Content-Type'] = content_type if encoding == 'quoted-printable': resource['Content-Transfer-Encoding'] = encoding resource.set_payload(quopri.encodestring(payload.encode())) elif encoding == 'base64': resource['Content-Transfer-Encoding'] = encoding resource.set_payload(base64.b64encode(payload)) elif encoding == 'base64-encoded': # Already base64 encoded resource['Content-Transfer-Encoding'] = 'base64' resource.set_payload(payload) else: raise ValueError('invalid encoding') resource['Content-Location'] = location self._msg.attach(resource)
def send(self, target: str, subject: str, html_content_with_cids: str, inline_png_cids_filenames: {str : str}): msg = EmailMessage() msg['Subject'] = subject msg['From'] = self.sender msg['To'] = target msg.set_content('') msg.add_alternative( html_content_with_cids, subtype='html' ) for png_cid in inline_png_cids_filenames: full_path_to_png = os.path.abspath(os.path.join( os.path.dirname(__file__), inline_png_cids_filenames[png_cid] )) with open(full_path_to_png, 'rb') as png_file: file_contents = png_file.read() msg.get_payload()[1].add_related(file_contents, 'image', 'png', cid=png_cid) with smtplib.SMTP(self.smtp_server_host, self.smtp_server_port) as smtp_connection: smtp_connection.starttls() smtp_connection.login(self.username, self.password) smtp_connection.send_message(msg)
def send_mail(app, sender='%s service <%s>' % (settings.BRAND, settings.SERVER_EMAIL), recipients=tuple(), subject='', body=''): if not len(recipients): return async with app.smtp as conn: msg = EmailMessage() msg['Subject'] = subject msg['From'] = sender msg['To'] = ', '.join(recipients) msg.set_content(body) return await conn.send_message(msg, sender, recipients, timeout=5)
def __init__(self, _factory=None, *, policy=compat32): """_factory is called with no arguments to create a new message obj The policy keyword specifies a policy object that controls a number of aspects of the parser's operation. The default policy maintains backward compatibility. """ self.policy = policy self._factory_kwds = lambda: {'policy': self.policy} if _factory is None: # What this should be: #self._factory = policy.default_message_factory # but, because we are post 3.4 feature freeze, fix with temp hack: if self.policy is compat32: self._factory = message.Message else: self._factory = message.EmailMessage else: self._factory = _factory try: _factory(policy=self.policy) except TypeError: # Assume this is an old-style factory self._factory_kwds = lambda: {} self._input = BufferedSubFile() self._msgstack = [] self._parse = self._parsegen().__next__ self._cur = None self._last = None self._headersonly = False # Non-public interface for supporting Parser's headersonly flag
def __init__(self): self._msg = EmailMessage() self._msg['MIME-Version'] = '1.0' self._msg.add_header('Content-Type', 'multipart/related', type='text/html')
def send_help_email(username:str, user_email:str, subject:str, message:str) -> bool: """ Sends an email to the netsoc email address containing the help data, CC'ing all the SysAdmins and the user requesting help. This enables us to reply to the email directly instead of copypasting the from address and disconnecting history. :param username the user requesting help :param user_email the user's email address :param subject the subject of the user's help requests :param message the user's actual message """ message_body = \ """ From: %s\n Email: %s %s PS: Please "Reply All" to the emails so that you get a quicker response."""%( username, user_email, message) msg = EmailMessage() msg.set_content(message_body) msg["From"] = p.NETSOC_ADMIN_EMAIL_ADDRESS msg["To"] = p.NETSOC_EMAIL_ADDRESS msg["Subject"] = "[Netsoc Help] " + subject msg["Cc"] = tuple(p.SYSADMIN_EMAILS + [user_email]) try: with smtplib.SMTP("smtp.sendgrid.net", 587) as s: s.login(p.SENDGRID_USERNAME, p.SENDGRID_PASSWORD) s.send_message(msg) except: return False return True
def send_sudo_request_email(username:str, user_email:str): """ Sends an email notifying SysAdmins that a user has requested an account on feynman. :param username the server username of the user who made the request. :param user_email the email address of that user to contact them for vetting. """ message_body = \ """ Hi {username}, Thank you for making a request for an account with sudo privileges on feynman.netsoc.co. We will be in touch shortly. Best, The UCC Netsoc SysAdmin Team. PS: Please "Reply All" to the emails so that you get a quicker response. """.format(username=username) msg = EmailMessage() msg.set_content(message_body) msg["From"] = p.NETSOC_ADMIN_EMAIL_ADDRESS msg["To"] = p.NETSOC_EMAIL_ADDRESS msg["Subject"] = "[Netsoc Help] Sudo request on Feynman for {user}".format( user=username) msg["Cc"] = tuple(p.SYSADMIN_EMAILS + [user_email]) with smtplib.SMTP("smtp.sendgrid.net", 587) as s: s.login(p.SENDGRID_USERNAME, p.SENDGRID_PASSWORD) s.send_message(msg)
def signup(req): if any(map(lambda key: key not in req.json, ["name", "email", "password"])): logger.debug(f"Request is {req.json} but some arguments are missing.") raise InvalidUsage("Missing argument") if not await User.is_free(req.json["name"], req.json["email"]): logger.debug(f"Request is {req.json} but name or email is already taken") raise InvalidUsage("Username or email already taken") guest = Guest(req.json["name"], req.json["email"], User.hashpwd(req.json["password"])) chlg = await challenges.create_for(guest) logger.info(f"Guest signed up with name: {guest.name} and email: {guest.email}. Challenge generated: {chlg}") with open("mails" + sep + "challenge.txt") as mailtext: mail = EmailMessage() mail.set_content(mailtext.read().format(domain=req.app.config.domain, scheme="https" if req.app.config.schemessl else "http", challenge=chlg)) mail["Subject"] = "WebGames Registration Challenge" mail["From"] = req.app.config.smtpuser mail["To"] = guest.email with SMTP(req.app.config.smtphost, req.app.config.smtpport) as smtp: if req.app.config.smtpssl: smtp.ehlo() smtp.starttls() smtp.ehlo() else: smtp.helo() smtp.login(req.app.config.smtpuser, req.app.config.smtppwd) smtp.send_message(mail) return text(f"Challenge sent to {guest.email}")
def emit(self, record): """ Emit a record. Format the record and send it to the specified addressees. """ try: import smtplib from email.message import EmailMessage import email.utils port = self.mailport if not port: port = smtplib.SMTP_PORT smtp = smtplib.SMTP(self.mailhost, port, timeout=self.timeout) msg = EmailMessage() msg['From'] = self.fromaddr msg['To'] = ','.join(self.toaddrs) msg['Subject'] = self.getSubject(record) msg['Date'] = email.utils.localtime() msg.set_content(self.format(record)) if self.username: if self.secure is not None: smtp.ehlo() smtp.starttls(*self.secure) smtp.ehlo() smtp.login(self.username, self.password) smtp.send_message(msg) smtp.quit() except Exception: self.handleError(record)
def send_email(self: celery.Task, to_name: str, to_addr: str, subject: str, text: str, html: str): """Send an email to a single address.""" # WARNING: when changing the signature of this function, also change the # self.retry() call below. cfg = current_app.config # Construct the message msg = EmailMessage() msg['Subject'] = subject msg['From'] = Address(cfg['MAIL_DEFAULT_FROM_NAME'], addr_spec=cfg['MAIL_DEFAULT_FROM_ADDR']) msg['To'] = (Address(to_name, addr_spec=to_addr),) msg.set_content(text) msg.add_alternative(html, subtype='html') # Refuse to send mail when we're testing. if cfg['TESTING']: log.warning('not sending mail to %s <%s> because we are TESTING', to_name, to_addr) return log.info('sending email to %s <%s>', to_name, to_addr) # Send the message via local SMTP server. try: with smtplib.SMTP(cfg['SMTP_HOST'], cfg['SMTP_PORT'], timeout=cfg['SMTP_TIMEOUT']) as smtp: if cfg.get('SMTP_USERNAME') and cfg.get('SMTP_PASSWORD'): smtp.login(cfg['SMTP_USERNAME'], cfg['SMTP_PASSWORD']) smtp.send_message(msg) except (IOError, OSError) as ex: log.exception('error sending email to %s <%s>, will retry later: %s', to_name, to_addr, ex) self.retry((to_name, to_addr, subject, text, html), countdown=cfg['MAIL_RETRY']) else: log.info('mail to %s <%s> successfully sent', to_name, to_addr)