我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用email.mime.multipart.MIMEMultipart()。
def as_mailobj(self): # Create the root message and fill in the from, to, and subject headers msgRoot = MIMEMultipart('related') msgRoot['Subject'] = self.subject msgRoot['From'] = self.get_sender() msgRoot['To'] = ", ".join(self.recipients) msgRoot.preamble = 'Generated by mailprod python tool' for i in range(len(self.headers)): name, value, params = self.headers[i] msgRoot.add_header(name, value, **params) for i in range(len(self.body)): part = self.body[i] msgRoot.attach(part) for i in range(len(self.attachments)): attachment = self.attachments[i] msgRoot.attach(attachment) return msgRoot
def mime_multipart(_, context, arg): """mime_multipart composes a MIME multipart string. Example: "UserData": { "Fn::Base64": { "CFPP::MimeMultipart": [ ["text/x-shellscript", {"CFPP::FileToString": "userdata.sh"}], ["text/cloud-config", {"CFPP::FileToString": "cloud-init.yaml"}] ] } } """ _raise_unless_mime_params(context, arg) mime_doc = MIMEMultipart() # set boundary explicitly so that they are stable based on path in the template. mime_doc.set_boundary("=" * 10 + hashlib.sha1(".".join(context)).hexdigest() + "=" * 3) for mime_type, contents in arg: sub_message = MIMEText(contents, contents, sys.getdefaultencoding()) mime_doc.attach(sub_message) return mime_doc.as_string()
def test_mime_attachments_in_constructor(self): eq = self.assertEqual text1 = MIMEText('') text2 = MIMEText('') msg = MIMEMultipart(_subparts=(text1, text2)) eq(len(msg.get_payload()), 2) eq(msg.get_payload(0), text1) eq(msg.get_payload(1), text2) # A general test of parser->model->generator idempotency. IOW, read a message # in, parse it into a message object tree, then without touching the tree, # regenerate the plain text. The original text and the transformed text # should be identical. Note: that we ignore the Unix-From since that may # contain a changed date.
def test__all__(self): module = __import__('email') # Can't use sorted() here due to Python 2.3 compatibility all = module.__all__[:] all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def get_mime_message(text, html_text=None, **kwargs): if not html_text: instance = MIMEText(text) else: instance = MIMEMultipart('alternative') instance.attach(MIMEText(text, 'plain')) instance.attach(MIMEText(html_text, 'html')) extra = MIMEBase('application', 'octet-stream') extra.set_payload(b'test content') encoders.encode_base64(extra) extra.add_header('Content-Disposition', 'attachment', filename='report.pdf') instance.attach(extra) instance['X-Accept-Language'] = 'en-us, en' for key, value in kwargs.items(): instance[key] = value return instance
def from_mime(cls, message, manager): """ Instantiates ``Email`` instance from ``MIMEText`` instance. :param message: ``email.mime.text.MIMEText`` instance. :param manager: :py:class:`EmailManager` instance. :return: :py:class:`Email` """ if isinstance(message, MIMEMultipart): text, html, attachments = deconstruct_multipart(message) else: text, html, attachments = message.get_payload(decode=True).decode('utf8'), None, [] subject = prepare_header(message['Subject']) sender = prepare_header(message['From']) to = prepare_header(message['To']) cc = prepare_header(message['Cc']) bcc = prepare_header(message['Bcc']) reply_to = prepare_header(message['Reply-To']) tag = getattr(message, 'tag', None) return cls(manager=manager, From=sender, To=to, TextBody=text, HtmlBody=html, Subject=subject, Cc=cc, Bcc=bcc, ReplyTo=reply_to, Attachments=attachments, Tag=tag)
def __init__(self, server, sender, password, receiver, title, message=None, path=None): """???Email :param title: ???????? :param message: ????????? :param path: ????????list??????str??????????? :param server: smtp??????? :param sender: ??????? :param password: ????????? :param receiver: ?????????“?”?????? """ self.title = title self.message = message self.files = path self.msg = MIMEMultipart('related') self.server = server self.sender = sender self.receiver = receiver self.password = password
def send_email(self): self.make_messages() if len(self.email_messages) > 0: for detail in self.email_messages: user_email = detail['email'] user_message = detail['message'] try: email_conn = smtplib.SMTP(host, port) email_conn.ehlo() email_conn.starttls() email_conn.login(username, password) the_msg = MIMEMultipart("alternative") the_msg['Subject'] = "Billing Update!" the_msg["From"] = from_email the_msg["To"] = user_email part_1 = MIMEText(user_message, 'plain') the_msg.attach(part_1) email_conn.sendmail(from_email, [user_email], the_msg.as_string()) email_conn.quit() except smtplib.SMTPException: print("error sending message") return True return False
def send_exception_mail(exception): # noinspection PyBroadException try: msg = MIMEMultipart('alternative') msg['Subject'] = "Exception Occurred on betaPika" msg['From'] = FROM_MAIL msg['To'] = TO_MAIL plain = MIMEText(exception, "plain", "utf-8") msg.attach(plain) s = smtplib.SMTP() s.connect(SERVER, PORT) s.ehlo() s.starttls() s.ehlo() s.login(FROM_MAIL, FROM_PASSWORD) s.sendmail(FROM_MAIL, TO_MAIL, msg.as_string()) s.quit() except: print "Mail Failed"
def send_txt(self, filename): # ???????????????????????????????????? self.smtp = smtplib.SMTP_SSL(port=465) self.smtp.connect(self.server) self.smtp.login(self.username, self.password) #self.msg = MIMEMultipart() self.msg = MIMEText("content", 'plain', 'utf-8') self.msg['to'] = self.to_mail self.msg['from'] = self.from_mail self.msg['Subject'] = "459" self.filename = filename + ".txt" self.msg['Date'] = Utils.formatdate(localtime=1) content = open(self.filename.decode('utf-8'), 'rb').read() # print content #self.att = MIMEText(content, 'base64', 'utf-8') #self.att['Content-Type'] = 'application/octet-stream' # self.att["Content-Disposition"] = "attachment;filename=\"%s\"" %(self.filename.encode('gb2312')) #self.att["Content-Disposition"] = "attachment;filename=\"%s\"" % Header(self.filename, 'gb2312') # print self.att["Content-Disposition"] #self.msg.attach(self.att) #self.smtp.sendmail(self.msg['from'], self.msg['to'], self.msg.as_string()) self.smtp.sendmail(self.msg['from'], self.msg['to'], self.msg.as_string()) self.smtp.quit()
def send_mail(self, sender, receiver, subject="", body=""): """Send email from sender to receiver """ mime_msg = MIMEMultipart('related') mime_msg['Subject'] = subject mime_msg['From'] = sender mime_msg['To'] = receiver msg_txt = MIMEText(body, 'plain') mime_msg.attach(msg_txt) try: host = getToolByName(self, 'MailHost') host.send(mime_msg.as_string(), immediate=True) except SMTPServerDisconnected as msg: logger.warn("SMTPServerDisconnected: %s." % msg) except SMTPRecipientsRefused as msg: raise WorkflowException(str(msg))
def testExtractAttachedKey(self): KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." message = MIMEMultipart() message.add_header("from", ADDRESS_2) key = MIMEApplication("", "pgp-keys") key.set_payload(KEY) message.attach(key) self.fetcher._keymanager.put_raw_key = Mock( return_value=defer.succeed(None)) def put_raw_key_called(_): self.fetcher._keymanager.put_raw_key.assert_called_once_with( KEY, address=ADDRESS_2) d = self._do_fetch(message.as_string()) d.addCallback(put_raw_key_called) return d
def testExtractInvalidAttachedKey(self): KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." message = MIMEMultipart() message.add_header("from", ADDRESS_2) key = MIMEApplication("", "pgp-keys") key.set_payload(KEY) message.attach(key) self.fetcher._keymanager.put_raw_key = Mock( return_value=defer.fail(KeyAddressMismatch())) def put_raw_key_called(_): self.fetcher._keymanager.put_raw_key.assert_called_once_with( KEY, address=ADDRESS_2) d = self._do_fetch(message.as_string()) d.addCallback(put_raw_key_called) d.addErrback(log.err) return d
def testExtractAttachedKeyAndNotOpenPGPHeader(self): KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." KEYURL = "https://leap.se/key.txt" OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) message = MIMEMultipart() message.add_header("from", ADDRESS_2) message.add_header("OpenPGP", OpenPGP) key = MIMEApplication("", "pgp-keys") key.set_payload(KEY) message.attach(key) self.fetcher._keymanager.put_raw_key = Mock( return_value=defer.succeed(None)) self.fetcher._keymanager.fetch_key = Mock() def put_raw_key_called(_): self.fetcher._keymanager.put_raw_key.assert_called_once_with( KEY, address=ADDRESS_2) self.assertFalse(self.fetcher._keymanager.fetch_key.called) d = self._do_fetch(message.as_string()) d.addCallback(put_raw_key_called) return d
def testExtractOpenPGPHeaderIfInvalidAttachedKey(self): KEY = "-----BEGIN PGP PUBLIC KEY BLOCK-----\n..." KEYURL = "https://leap.se/key.txt" OpenPGP = "id=12345678; url=\"%s\"; preference=signencrypt" % (KEYURL,) message = MIMEMultipart() message.add_header("from", ADDRESS_2) message.add_header("OpenPGP", OpenPGP) key = MIMEApplication("", "pgp-keys") key.set_payload(KEY) message.attach(key) self.fetcher._keymanager.put_raw_key = Mock( return_value=defer.fail(KeyAddressMismatch())) self.fetcher._keymanager.fetch_key = Mock() def put_raw_key_called(_): self.fetcher._keymanager.put_raw_key.assert_called_once_with( KEY, address=ADDRESS_2) self.fetcher._keymanager.fetch_key.assert_called_once_with( ADDRESS_2, KEYURL) d = self._do_fetch(message.as_string()) d.addCallback(put_raw_key_called) return d
def mail_client(host, port, fromAddress, password, toAddress, subject, body): msg = MIMEMultipart() msg['From'] = fromAddress msg['To'] = toAddress msg['Subject'] = subject message = body msg.attach(MIMEText(message)) mailserver = smtplib.SMTP(host,port) # Identify to the SMTP Gmail Client mailserver.ehlo() # Secure with TLS Encryption mailserver.starttls() # Reidentifying as an encrypted connection mailserver.ehlo() mailserver.login(fromAddress, password) mailserver.sendmail(fromAddress,toAddress,msg.as_string()) print ("Email sent from:", fromAddress) mailserver.quit()
def sendreport(self, body): """ Send a notification by email :param body: Body of the mail :return: """ smtp = self.config['smtp'] msg = multipart.MIMEMultipart() msg['From'] = smtp['from'] msg['To'] = smtp['to'] msg['Subject'] = smtp['subject'] msg.attach(text.MIMEText(body, 'plain')) server = smtplib.SMTP(smtp['host'], smtp['port']) server.starttls() server.login(smtp['user'], smtp['password']) server.sendmail(smtp['from'], smtp['to'], msg.as_string()) server.quit()
def book_msg(to_addr, book): # TODO: Give a subject. subject = '' mobi_workshop_dir = '../../mobi_workshop/' bookname = book+'.mobi' msg = MIMEMultipart() msg['From'] = mail_config['from_addr'] msg['To'] = to_addr msg['Subject'] = Header(subject, 'utf-8').encode() msg.attach(MIMEText('send with file...', 'plain', 'utf-8')) # TODO: check if target is 0kb, if so, send_alert. with open(mobi_workshop_dir+book+'.mobi', 'rb') as f: mime = MIMEBase('*', '*/*', filename=bookname) mime.add_header('Content-Disposition', 'attachment', filename=bookname) mime.add_header('Content-ID', '<0>') mime.add_header('X-Attachment-Id', '0') mime.set_payload(f.read()) encoders.encode_base64(mime) msg.attach(mime) return msg
def send_email(self, mail_from, mail_to, subject, html_msg, cc_list=None, plain_msg=None): LOG.info('Sending email ....') message = MIMEMultipart() message['Subject'] = subject message['to'] = mail_to if cc_list: message['cc'] = ', '.join(cc_list) message['from'] = mail_from or self.notify_from msg = MIMEText(html_msg, 'html') message.attach(msg) if plain_msg: plain_msg = MIMEText(plain_msg, 'plain') message.attach(plain_msg) try: self.server.sendmail(mail_from, mail_to, message.as_string()) LOG.info('Email sent successfully !') except Exception as e: LOG.error(e)
def gen_msg(content, subject, attachments, nick_from=None, nick_to='??'): if nick_from is None: nick_from = FROM_ADDR msg = MIMEMultipart() msg['From'] = _format_addr('{} <{}>'.format(nick_from, FROM_ADDR)) msg['To'] = _format_addr('{} <{}>'.format(nick_to, TO_ADDRS)) msg['Subject'] = Header(subject) msg.attach(MIMEText(content, 'html', 'utf-8')) for attachment in attachments: attach = MIMEText(open(attachment, 'rb').read(), 'base64', 'utf-8') attach['Content-Type'] = 'application/octet-stream' attach['Content-Disposition'] = 'attachment; filename="{}"'.format( os.path.basename(attachment)) msg.attach(attach) return msg
def notify(self, event): try: msg = MIMEMultipart() msg['From'] = self.emailer_settings['from_address'] msg['To'] = self.emailer_settings['to_address'] msg['Subject'] = "{0} {1}".format(event['subject'], event['scanname']) targets = "".join(["<li>{0}</li>".format(t) for t in event['targets']]) if event['targets'] else "" html = str(self.emailer_settings['email_template']).format(event['server'], event['scanname'], event['scanid'], event['subject'], targets) msg.attach(MIMEText(html, 'html')) mail_server = smtplib.SMTP(self.emailer_settings['smtp_host'], self.emailer_settings['smtp_port']) mail_server.sendmail(msg['From'], msg['To'], msg.as_string()) mail_server.quit() except (Exception, AttributeError) as e: # we don't want email failure to stop us, just log that it happened Logger.app.error("Your emailer settings in config.ini is incorrectly configured. Error: {}".format(e))
def cloudscan_notify(self, recipient, subject, git_url, ssc_url, state, scan_id, scan_name): try: msg = MIMEMultipart() msg['From'] = self.from_address msg['To'] = recipient msg['Subject'] = subject html = str(self.email_template).format(git_url, ssc_url, scan_name, scan_id, state, self.chatroom) msg.attach(MIMEText(html, 'html')) mail_server = smtplib.SMTP(self.smtp_host, self.smtp_port) mail_server.sendmail(msg['From'], msg['To'], msg.as_string()) mail_server.quit() except (Exception, AttributeError) as e: # we don't want email failure to stop us, just log that it happened Logger.app.error("Your emailer settings in config.ini is incorrectly configured. Error: {}".format(e))
def test_send_mime(self, mock_smtp, mock_smtp_ssl): mock_smtp.return_value = mock.Mock() mock_smtp_ssl.return_value = mock.Mock() msg = MIMEMultipart() utils.email.send_MIME_email('from', 'to', msg, dryrun=False) mock_smtp.assert_called_with( configuration.get('smtp', 'SMTP_HOST'), configuration.getint('smtp', 'SMTP_PORT'), ) self.assertTrue(mock_smtp.return_value.starttls.called) mock_smtp.return_value.login.assert_called_with( configuration.get('smtp', 'SMTP_USER'), configuration.get('smtp', 'SMTP_PASSWORD'), ) mock_smtp.return_value.sendmail.assert_called_with('from', 'to', msg.as_string()) self.assertTrue(mock_smtp.return_value.quit.called)
def mail(self): gonderen = "sender e-mail" pswd = "sender e-mail passwd" alici = "receiver e mail " cc = "if you need add cc e-mail " body=self.mesaj messg = MIMEMultipart() messg['To'] = alici messg['Cc'] = cc messg['From'] = gonderen messg['Subject'] = "ANomaly tespit sistemi" text = MIMEText(body, 'plain') messg.attach(text) print(self.mesaj) try: server = smtplib.SMTP("smtp.live.com", 587) server.starttls() server.login(gonderen, pswd) server.sendmail(gonderen,[alici,cc,alici],messg.as_string()) server.close() print("mail gönderildi") except smtplib.SMTPException as e: print(str(e))
def sendemail(from_addr, to_addr_list, cc_addr_list, subject, message, login, password, smtpserver='smtp.gmail.com:587'): msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = from_addr msg['To'] = ','.join(to_addr_list) htmlmessage = '<html><head></head><body>'+message+'</body></html>' part1 = MIMEText(message, 'plain') part2 = MIMEText(htmlmessage, 'html') msg.attach(part1) msg.attach(part2) server = smtplib.SMTP(smtpserver) server.starttls() server.login(login,password) problems = server.sendmail(from_addr, to_addr_list, msg.as_string()) server.quit() return problems
def send_email(send_from,send_to,subject,text,files=None,server="10.210.41.203"): #assert(isinstance(send_to,list),"Send To email should be a list") msg = MIMEMultipart() msg['From'] = send_from msg['To'] = send_to msg['Date'] = formatdate(localtime = True) msg['Subject'] = subject msg.attach(MIMEText(text,'html')) with open(files,"rb") as f: part = MIMEApplication(f.read(),Name=basename(files)) msg.attach(part) smtp = smtplib.SMTP(server,0) smtp.sendmail(send_from,send_to,msg.as_string()) smtp.quit()
def send_mail(self, subject, message, files=None): if files is None: files = [] msg = MIMEMultipart() msg['Date'] = formatdate(localtime=True) msg['Subject'] = subject msg.attach(MIMEText(message)) # TODO files attachment max size if files is not None: for f in files: part = MIMEBase('application', "octet-stream") part.set_payload(open(f, "rb").read()) encoders.encode_base64(part) part.add_header('Content-Disposition', 'attachment; filename="{0}"'.format(os.path.basename(f))) msg.attach(part) self.logger.debug('Sending mail to {0} {1}'.format(self.to_address, ' about {0}'.format(subject))) self.server.sendmail(self.from_username, self.to_address, msg.as_string()) self.logger.debug('Mail was sent.')