我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用email.mime.text.MIMEText()。
def mime_multipart(_, context, arg): """mime_multipart composes a MIME multipart string. Example: "UserData": { "Fn::Base64": { "CFPP::MimeMultipart": [ ["text/x-shellscript", {"CFPP::FileToString": "userdata.sh"}], ["text/cloud-config", {"CFPP::FileToString": "cloud-init.yaml"}] ] } } """ _raise_unless_mime_params(context, arg) mime_doc = MIMEMultipart() # set boundary explicitly so that they are stable based on path in the template. mime_doc.set_boundary("=" * 10 + hashlib.sha1(".".join(context)).hexdigest() + "=" * 3) for mime_type, contents in arg: sub_message = MIMEText(contents, contents, sys.getdefaultencoding()) mime_doc.attach(sub_message) return mime_doc.as_string()
def test_header_splitter(self): eq = self.ndiffAssertEqual msg = MIMEText('') # It'd be great if we could use add_header() here, but that doesn't # guarantee an order of the parameters. msg['X-Foobar-Spoink-Defrobnit'] = ( 'wasnipoop; giraffes="very-long-necked-animals"; ' 'spooge="yummy"; hippos="gargantuan"; marshmallows="gooey"') sfp = StringIO() g = Generator(sfp) g.flatten(msg) eq(sfp.getvalue(), '''\ Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Foobar-Spoink-Defrobnit: wasnipoop; giraffes="very-long-necked-animals"; \tspooge="yummy"; hippos="gargantuan"; marshmallows="gooey" ''')
def test_mime_attachments_in_constructor(self): eq = self.assertEqual text1 = MIMEText('') text2 = MIMEText('') msg = MIMEMultipart(_subparts=(text1, text2)) eq(len(msg.get_payload()), 2) eq(msg.get_payload(0), text1) eq(msg.get_payload(1), text2) # A general test of parser->model->generator idempotency. IOW, read a message # in, parse it into a message object tree, then without touching the tree, # regenerate the plain text. The original text and the transformed text # should be identical. Note: that we ignore the Unix-From since that may # contain a changed date.
def test__all__(self): module = __import__('email') # Can't use sorted() here due to Python 2.3 compatibility all = module.__all__[:] all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def CreateMessage(sender, to, subject, message_text): """Create a message for an email. Args: sender: Email address of the sender. to: Email address of the receiver. subject: The subject of the email message. message_text: The text of the email message. Returns: An object containing a base64url encoded email object. """ message = MIMEText(message_text) message['to'] = to message['from'] = sender message['subject'] = subject return {'raw': base64.urlsafe_b64encode(message.as_string())}
def get_mime_message(text, html_text=None, **kwargs): if not html_text: instance = MIMEText(text) else: instance = MIMEMultipart('alternative') instance.attach(MIMEText(text, 'plain')) instance.attach(MIMEText(html_text, 'html')) extra = MIMEBase('application', 'octet-stream') extra.set_payload(b'test content') encoders.encode_base64(extra) extra.add_header('Content-Disposition', 'attachment', filename='report.pdf') instance.attach(extra) instance['X-Accept-Language'] = 'en-us, en' for key, value in kwargs.items(): instance[key] = value return instance
def from_mime(cls, message, manager): """ Instantiates ``Email`` instance from ``MIMEText`` instance. :param message: ``email.mime.text.MIMEText`` instance. :param manager: :py:class:`EmailManager` instance. :return: :py:class:`Email` """ if isinstance(message, MIMEMultipart): text, html, attachments = deconstruct_multipart(message) else: text, html, attachments = message.get_payload(decode=True).decode('utf8'), None, [] subject = prepare_header(message['Subject']) sender = prepare_header(message['From']) to = prepare_header(message['To']) cc = prepare_header(message['Cc']) bcc = prepare_header(message['Bcc']) reply_to = prepare_header(message['Reply-To']) tag = getattr(message, 'tag', None) return cls(manager=manager, From=sender, To=to, TextBody=text, HtmlBody=html, Subject=subject, Cc=cc, Bcc=bcc, ReplyTo=reply_to, Attachments=attachments, Tag=tag)
def test_header_splitter(self): eq = self.ndiffAssertEqual msg = MIMEText('') # It'd be great if we could use add_header() here, but that doesn't # guarantee an order of the parameters. msg['X-Foobar-Spoink-Defrobnit'] = ( 'wasnipoop; giraffes="very-long-necked-animals"; ' 'spooge="yummy"; hippos="gargantuan"; marshmallows="gooey"') sfp = StringIO() g = Generator(sfp) g.flatten(msg) eq(sfp.getvalue(), '''\ Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Foobar-Spoink-Defrobnit: wasnipoop; giraffes="very-long-necked-animals"; spooge="yummy"; hippos="gargantuan"; marshmallows="gooey" ''')
def test_binary_body_with_encode_noop(self): # Issue 16564: This does not produce an RFC valid message, since to be # valid it should have a CTE of binary. But the below works, and is # documented as working this way. bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff' msg = MIMEApplication(bytesdata, _encoder=encoders.encode_noop) self.assertEqual(msg.get_payload(), bytesdata) self.assertEqual(msg.get_payload(decode=True), bytesdata) s = StringIO() g = Generator(s) g.flatten(msg) wireform = s.getvalue() msg2 = email.message_from_string(wireform) self.assertEqual(msg.get_payload(), bytesdata) self.assertEqual(msg2.get_payload(decode=True), bytesdata) # Test the basic MIMEText class
def send_email(loop: asyncio.AbstractEventLoop, mail_from: str, mail_to: Union[Iterable, str], subject: str, body: str, server: str='localhost') -> None: """Send an email to one or more recipients. Only supports plain text emails with a single message body. No attachments etc. """ if type(mail_to) == str: mail_to = [mail_to] smtp = aiosmtplib.SMTP(hostname=server, port=25, loop=loop) try: await smtp.connect() for rcpt in mail_to: msg = MIMEText(body) msg['Subject'] = subject msg['From'] = mail_from msg['To'] = rcpt await smtp.send_message(msg) await smtp.quit() except aiosmtplib.errors.SMTPException as e: log.msg('Error sending smtp notification: %s' % (str(e)), 'NOTIFICATIONS')
def send_email(msg_subj, msg_txt, msg_rcpt, i=0, count=0): from email.mime.text import MIMEText userId, gmail = buildGAPIServiceObject(API.GMAIL, _getValueFromOAuth(u'email')) if not gmail: return msg = MIMEText(msg_txt) msg[u'Subject'] = msg_subj msg[u'From'] = userId msg[u'To'] = msg_rcpt action = Act.Get() Act.Set(Act.SENDEMAIL) try: callGAPI(gmail.users().messages(), u'send', userId=userId, body={u'raw': base64.urlsafe_b64encode(msg.as_string())}, fields=u'') entityActionPerformed([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], i, count) except googleapiclient.errors.HttpError as e: entityActionFailedWarning([Ent.RECIPIENT, msg_rcpt, Ent.MESSAGE, msg_subj], str(e), i, count) Act.Set(action) # Write a CSV file
def _flush_recipient(self, recipient): if not isinstance(recipient["config"]["info"]["email"], list): email_addresses = [recipient["config"]["info"]["email"]] else: email_addresses = list(set(recipient["config"]["info"]["email"])) logging.info("Sending email to {} ({}) for {}".format( recipient["id"], ", ".join(email_addresses), ", ".join([route["prefix"] for route in recipient["routes"]]) )) data = { "id": recipient["id"], "from_addr": self.from_addr, "subject": self.subject, "routes_list": self._format_list_of_routes(recipient["routes"]) } msg = MIMEText(self.template.format(**data)) msg['Subject'] = self.subject msg['From'] = self.from_addr msg['To'] = ", ".join(email_addresses) self._send_email(self.from_addr, email_addresses, msg.as_string())
def _send_mime_text_to_email(self, text, email): """ :type text: mime_text.MIMEText :type email: str :return: """ text['To'] = email try: pass connection = smtplib.SMTP(self._smtp_config['host']) connection.ehlo() connection.starttls() connection.ehlo() connection.login(self._smtp_config['username'], self._smtp_config['password']) connection.sendmail(self._smtp_config['sender'], email, text.as_string()) connection.quit() except smtplib.SMTPException: self._logger.exception('Unable to send email to address "{}"'.format(email))
def send_message(self, email_content): # ??????? now = datetime.datetime.now() header = self.smtp_email_header + '[' + str(now.month) + '-' + str(now.day) + ' ' + \ str(now.hour) + ':' + str(now.minute) + ':' + str(now.second) + ']' msg = MIMEText(email_content, 'plain', 'utf-8') msg['from'] = self.smtp_from_addr msg['to'] = self.smtp_to_addr msg['Subject'] = Header(header, 'utf-8').encode() # ?? try: smtp_server = smtplib.SMTP(self.smtp_server_host, self.smtp_server_port) smtp_server.login(self.smtp_from_addr, self.smtp_server_password) smtp_server.sendmail(self.smtp_from_addr, [self.smtp_to_addr], msg.as_string()) smtp_server.quit() except Exception as e: if log.isEnabledFor(logging.ERROR): log.error("??????") log.exception(e)
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]): """Send an email """ subtype = 'html' if html else 'plain' message = MIMEMultipart() message['To'] = ', '.join(To) message['Subject'] = Subject message['Cc'] = ', '.join(Cc) message['Bcc'] = ', '.join(Bcc) message.attach(MIMEText(Body, subtype)) for f in files: with open(f, "rb") as In: part = MIMEApplication(In.read(), Name=basename(f)) part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f) message.attach(part) message = {'raw': base64.urlsafe_b64encode(message.as_string())} credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get() Http = credentials.authorize(httplib2.Http()) service = discovery.build('gmail', 'v1', http=Http) message = service.users().messages().send(userId='me', body=message).execute()
def send_email(self): self.make_messages() if len(self.email_messages) > 0: for detail in self.email_messages: user_email = detail['email'] user_message = detail['message'] try: email_conn = smtplib.SMTP(host, port) email_conn.ehlo() email_conn.starttls() email_conn.login(username, password) the_msg = MIMEMultipart("alternative") the_msg['Subject'] = "Billing Update!" the_msg["From"] = from_email the_msg["To"] = user_email part_1 = MIMEText(user_message, 'plain') the_msg.attach(part_1) email_conn.sendmail(from_email, [user_email], the_msg.as_string()) email_conn.quit() except smtplib.SMTPException: print("error sending message") return True return False
def send_email(sender, sender_pass, receiver, subject, body): # Create email msg = MIMEText(body) msg['Subject'] = subject msg['From'] = sender msg['To'] = receiver # Send email out server = smtplib.SMTP('smtp.gmail.com', 587) server.starttls() server.login(sender, sender_pass) server.send_message(msg) server.quit() # Gets and caches your gmail address
def send_invite(user): if _cfg("smtp-host") == "": return smtp = smtplib.SMTP_SSL(_cfg("smtp-host"), _cfgi("smtp-port")) smtp.ehlo() smtp.login(_cfg("smtp-user"), _cfg("smtp-password")) with open("emails/invite") as f: message = MIMEText(html.parser.HTMLParser().unescape(\ pystache.render(f.read(), { 'user': user, "domain": _cfg("domain"), "protocol": _cfg("protocol") }))) message['Subject'] = "Your wank.party account is approved" message['From'] = _cfg("smtp-user") message['To'] = user.email smtp.sendmail(_cfg("smtp-user"), [ user.email ], message.as_string()) smtp.quit()
def send_reset(user): if _cfg("smtp-host") == "": return smtp = smtplib.SMTP_SSL(_cfg("smtp-host"), _cfgi("smtp-port")) smtp.ehlo() smtp.login(_cfg("smtp-user"), _cfg("smtp-password")) with open("emails/reset") as f: message = MIMEText(html.parser.HTMLParser().unescape(\ pystache.render(f.read(), { 'user': user, "domain": _cfg("domain"), "protocol": _cfg("protocol"), 'confirmation': user.passwordReset }))) message['Subject'] = "Reset your wank.party password" message['From'] = _cfg("smtp-user") message['To'] = user.email smtp.sendmail(_cfg("smtp-user"), [ user.email ], message.as_string()) smtp.quit()
def send_mail(to_list,sub,context): me = mail_user msg = MIMEText(context) msg['Subject'] = sub msg['From'] = me msg['To'] = ";".join(to_list) try: s = smtplib.SMTP(mail_host) s.login(mail_user,mail_pass) s.sendmail(me, to_list, msg.as_string()) s.quit() return True except Exception, e: return False
def send_exception_mail(exception): # noinspection PyBroadException try: msg = MIMEMultipart('alternative') msg['Subject'] = "Exception Occurred on betaPika" msg['From'] = FROM_MAIL msg['To'] = TO_MAIL plain = MIMEText(exception, "plain", "utf-8") msg.attach(plain) s = smtplib.SMTP() s.connect(SERVER, PORT) s.ehlo() s.starttls() s.ehlo() s.login(FROM_MAIL, FROM_PASSWORD) s.sendmail(FROM_MAIL, TO_MAIL, msg.as_string()) s.quit() except: print "Mail Failed"
def process_fill_event(self, fill_event): """Extension of abstract base class method. The extension implements the capability to send email notifications when fill events are received. """ # Call super class method to handle the fill event. super(InteractiveBrokersPortfolio, self).process_fill_event(fill_event) # Send an email notification containing the details of the fill event. # # TODO: Would it be better to make server an instance variable so that we # don't have to recreate it every time there's a fill event? if self.gmail_and_password is not None: server = smtplib.SMTP("smtp.gmail.com", 587) server.starttls() server.login(*self.gmail_and_pass) msg = MIMEText(str(fill_event), "plain", "utf-8") msg["From"] = msg["To"] = self.gmail_and_pass[0] msg["Subject"] = "Odin Trade Notification" server.sendmail( self.gmail_and_pass[0], self.gmail_and_pass[0], msg.as_string() ) server.quit()
def run(self, nickname=None, content=None, image=None): text = content + "<br>" + "<img src={image} />".format(image=image) \ if image is not None else content msg = MIMEText(text, _subtype="html", _charset="utf-8") msg["Subject"] = "????" msg["From"] = nickname + "<" + MAIL_SENDER + ">" msg["To"] = ",".join(MAIL_RECEIVER_LIST) try: server = smtplib.SMTP() server.connect(MAIL_HOST) server.starttls() server.login(MAIL_SENDER, MAIL_PASSWORD) server.sendmail(MAIL_SENDER, MAIL_RECEIVER_LIST, msg.as_string()) server.close() except Exception as e: LOG.error('send mail failed with error: %s' % str(e)) return False return True