我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用django.core.mail.get_connection()。
def send_mail(subject, message, from_email, recipient_list, fail_silently=False, auth_user=None, auth_password=None, connection=None, html_message=None, headers=None, cc=None, bcc=None): """Override django send_mail function to allow use of custom email headers. """ connection = connection or get_connection(username=auth_user, password=auth_password, fail_silently=fail_silently) mail = EmailMultiAlternatives(subject, message, from_email, recipient_list, connection=connection, headers=headers, cc=cc, bcc=bcc) if html_message: mail.attach_alternative(html_message, 'text/html') return mail.send()
def check_email_server_is_alive(app_configs=None, **kwargs): from django.conf import settings errors = [] if settings.POOTLE_SIGNUP_ENABLED or settings.POOTLE_CONTACT_ENABLED: from django.core.mail import get_connection connection = get_connection() try: connection.open() except Exception: errors.append(checks.Warning( _("Email server is not available."), hint=_("Review your email settings and make sure your email " "server is working."), id="pootle.W004", )) else: connection.close() return errors
def send_mass_html_mail(datatuple, fail_silently=False, user=None, password=None, connection=None): """ Given a datatuple of (subject, text_content, html_content, from_email, recipient_list), sends each message to each recipient list. Returns the number of emails sent. If from_email is None, the DEFAULT_FROM_EMAIL setting is used. If auth_user and auth_password are set, they're used to log in. If auth_user is None, the EMAIL_HOST_USER setting is used. If auth_password is None, the EMAIL_HOST_PASSWORD setting is used. """ connection = connection or get_connection( username=user, password=password, fail_silently=fail_silently) messages = [] for subject, text, html, from_email, recipient in datatuple: message = EmailMultiAlternatives(subject, text, from_email, recipient) message.attach_alternative(html, 'text/html') messages.append(message) return connection.send_messages(messages)
def get_mail(**kwargs): def check(): from django.core.mail import get_connection try: conn = get_connection(fail_silently=False) conn.open() ret = "OK" conn.close() except Exception as e: ret = str(e) return ret p = OrderedDict() p["backend"] = settings.EMAIL_BACKEND p["host"] = "{0}:{1}".format(settings.EMAIL_HOST, settings.EMAIL_PORT) p["tls"] = getattr(settings, "USE_TLS", False) p["ssl"] = getattr(settings, "USE_SSL", False) p["status"] = check() return p
def mail_send_task(to: str, subject: str, body: str, html: str, sender: str, event: int=None, cc: list=None, bcc: list=None, headers: dict=None): headers = headers or dict() if event: event = Event.objects.get(id=event) sender = sender or event.settings.get('mail_from') headers['reply-to'] = headers.get('reply-to', event.settings.get('mail_from')) backend = event.get_mail_backend() else: backend = get_connection(fail_silently=False) email = EmailMultiAlternatives(subject, body, sender, to=to, cc=cc, bcc=bcc, headers=headers) if html is not None: email.attach_alternative(inline_css(html), 'text/html') try: backend.send_messages([email]) except Exception: logger.exception('Error sending email') raise SendMailException('Failed to send an email to {}.'.format(to))
def send_license_deactivated_email(payment, last_4): connection = get_connection( username=settings.EMAIL_HOST_USER_BILLING, password=settings.EMAIL_HOST_PASSWORD_BILLING ) context = { 'payment': payment, 'last_4': last_4 } recipients = payment.account.billing_admins.values_list('email', flat=True) to_emails = [c for c in recipients] send_mail( subject_template_name='emails/billing/license_deactivated_subject.txt', email_template_name='emails/billing/license_deactivated.txt', to_email=to_emails, bcc=[settings.BILLING_BCC, ], context=context, from_email=settings.BILLING_EMAIL, html_email_template_name='emails/billing/license_deactivated.html', connection=connection )
def send_mail_with_bcc(subject, html_message, recipient_list, fail_silently=False): def divide_group(lst, k): return [lst[i:i+k] for i in range(0, len(lst), k)] for grp in divide_group(recipient_list, 100): try: connection = get_connection( username=None, password=None, fail_silently=fail_silently, ) mail = EmailMultiAlternatives(subject, bcc=grp, connection=connection) mail.attach_alternative(html_message, 'text/html') mail.send() except: traceback.print_exc() time.sleep(30)
def send_mail_with_backend( subject, body, from_email, recipient_list, html_message=None, fail_silently=False, auth_user=None, auth_password=None, attachments=None, alternatives=None, connection=None, headers=None, do_not_encrypt_this_message=False): connection = connection or mail.get_connection( username=auth_user, password=auth_password, fail_silently=fail_silently, ) message = mail.EmailMultiAlternatives( subject, body, from_email, recipient_list, attachments=attachments, connection=connection, headers=headers) if html_message: message.attach_alternative(html_message, 'text/html') for alternative, mimetype in alternatives or []: message.attach_alternative(alternative, mimetype) if do_not_encrypt_this_message: message.do_not_encrypt_this_message = True return message.send()
def mail_send_task(to: str, subject: str, body: str, sender: str, cc: list=None, bcc: list=None, headers: dict=None): email = EmailMultiAlternatives(subject, body, sender, to=to, cc=cc, bcc=bcc, headers=headers) backend = get_connection(fail_silently=False) try: backend.send_messages([email]) except Exception: logger.exception('Error sending email') raise SendMailException('Failed to send an email to {}.'.format(to))
def test_silent_exception(self): with mail.get_connection(fail_silently=True) as connection: send_with_connection(connection)
def test_loud_exception(self): with mail.get_connection() as connection: with pytest.raises(ValueError): send_with_connection(connection)
def test_close_closed_connection(): with mail.get_connection() as connection: connection.close()
def send(self, connection=None, fail_silently=False): if connection is None: from django.core.mail import get_connection connection = get_connection( backend=settings.EMAIL_QUEUE_EMAIL_BACKEND, fail_silently=fail_silently ) to = self.to.split(EMAIL_ADDRESS_SEPARATOR) if self.to and self.to.strip() else None cc = self.cc.split(EMAIL_ADDRESS_SEPARATOR) if self.cc and self.cc.strip() else None bcc = self.bcc.split(EMAIL_ADDRESS_SEPARATOR) if self.bcc and self.bcc.strip() else None mail = EmailMultiAlternatives( subject=self.subject, body=self.body_text, from_email=self.from_email, to=to, cc=cc, bcc=bcc, connection=connection ) if self.body_html: mail.attach_alternative(self.body_html, HTML_MIME_TYPE) mail.send(fail_silently=fail_silently) self.posted = timezone.now() self.status = QueuedEmailMessageStatus.posted self.save() logging.debug("Message posted: %s", self) return mail
def get_connection(self, fail_silently=False): from django.core.mail import get_connection if not self.connection: self.connection = get_connection(fail_silently=fail_silently) return self.connection
def send(self, fail_silently=False): """Sends the email message.""" if not self.recipients(): # Don't bother creating the network connection if there's nobody to # send to. return 0 return self.get_connection(fail_silently).send_messages([self])
def connection(self): return get_connection(backend=self.email_backend, fail_silently=True)
def mail_connection(): mail.outbox = [] with mail.get_connection() as connection: yield connection
def send(self, event, users, instance, paths): from_address = settings.ASYNC_EMAIL_FROM reply_to = {} if hasattr(settings, 'ASYNC_EMAIL_REPLY_TO'): reply_to = {'Reply-To': settings.ASYNC_EMAIL_REPLY_TO, 'Return-Path': settings.ASYNC_EMAIL_REPLY_TO} messages = [] for user, templates in users.items(): if not self.enabled(event, user, paths) or not user.email: continue template = self._get_template(templates) if template is None: continue params = self.prepare(template, instance) e = mail.EmailMultiAlternatives( subject=params['subject'], body=params['description'], from_email=from_address, to=[user.email, ], headers=reply_to ) e.attach_alternative(markdown2.markdown(params['description'], extras=["link-patterns"], link_patterns=link_registry.link_patterns(request), safe_mode=True), 'text/html') messages.append(e) if len(messages): connection = mail.get_connection() connection.send_messages(messages)
def send(self, request, queryset): msgs = [] sent = 0 errors = 0 for reimb in queryset: try: reimb.send(request.user) msgs.append(emails.create_reimbursement_email(reimb, request)) sent += 1 except ValidationError as e: errors += 1 logging.error(e.message) if msgs: connection = mail.get_connection() connection.send_messages(msgs) if sent > 0 and errors > 0: self.message_user(request, ( "%s reimbursements sent, %s reimbursements not sent. Did you " "check that they were invited before and with money assigned?" % (sent, errors)), level=messages.WARNING) elif sent > 0: self.message_user(request, '%s reimbursement sent' % sent, level=messages.SUCCESS) else: self.message_user(request, 'Reimbursement couldn\'t be sent! Did you check ' 'that app was invited before and with money ' 'assigned?', level=messages.ERROR)
def send_batch_emails(emails): connection = mail.get_connection() connection.send_messages(emails)
def handle(self, *args, **options): fourdaysago = datetime.today() - timedelta(days=4) self.stdout.write('Checking reminders...') reminders = models.Application.objects.filter( status_update_date__lte=fourdaysago, status=models.APP_INVITED) self.stdout.write('Checking reminders...%s found' % reminders.count()) self.stdout.write('Sending reminders...') msgs = [] for app in reminders: app.last_reminder() msgs.append(emails.create_lastreminder_email(app)) connection = mail.get_connection() connection.send_messages(msgs) self.stdout.write(self.style.SUCCESS( 'Sending reminders... Successfully sent %s reminders' % len(msgs))) onedayago = datetime.today() - timedelta(days=1) self.stdout.write('Checking expired...') expired = models.Application.objects.filter( status_update_date__lte=onedayago, status=models.APP_LAST_REMIDER) self.stdout.write('Checking expired...%s found' % expired.count()) self.stdout.write('Setting expired...') count = len([app.expire() for app in expired]) self.stdout.write(self.style.SUCCESS( 'Setting expired... Successfully expired %s applications' % count))
def send_notification(self, sender, dests, reply_to=None, message_id=None, reference=None, footer=None, subject=None): messages = [] for dest, dest_name, dest_email in dests: dest_type = ContentType.objects.get_for_model(dest) dest, _ = MessageAuthor.objects.get_or_create(author_type=dest_type, author_id=dest.pk) token = self.token + dest.token + hexdigest_sha256(settings.SECRET_KEY, self.token, dest.token)[:16] if reply_to: reply_to_name, reply_to_email = reply_to reply_to_list = ['%s <%s>' % (reply_to_name, reply_to_email.format(token=token))] else: reply_to_list = [] headers = dict() if message_id: headers.update({ 'Message-ID': message_id.format(id=self.token), }) if message_id and reference: headers.update({ 'References': message_id.format(id=reference), }) body = self.content if footer is not None: body += footer messages.append(EmailMessage( subject=subject or self.subject, body=body, from_email='%s <%s>' % sender, to=['%s <%s>' % (dest_name, dest_email)], reply_to=reply_to_list, headers=headers, )) connection = get_connection() connection.send_messages(messages)
def get_mail_backend(self, force_custom: bool=False) -> BaseEmailBackend: from pretalx.common.mail import CustomSMTPBackend if self.settings.smtp_use_custom or force_custom: return CustomSMTPBackend(host=self.settings.smtp_host, port=self.settings.smtp_port, username=self.settings.smtp_username, password=self.settings.smtp_password, use_tls=self.settings.smtp_use_tls, use_ssl=self.settings.smtp_use_ssl, fail_silently=False) else: return get_connection(fail_silently=False)
def deliver_to_recipient(recipient, subject, message, from_email, message_html=None, attachments=None, nofilter=False, rfc2822_headers=None): msg = create_mail(subject, message, from_email, recipient, message_html, attachments, rfc2822_headers) msgid = msg.extra_headers['Message-ID'] backend = settings.EMAIL_BACKEND if nofilter or recipient.split('@')[1] in settings.EMAIL_UNFILTERED_DOMAINS: backend = settings.EMAIL_BACKEND_UNFILTERED connection = mail.get_connection(backend=backend) connection.send_messages([msg]) return (msgid, msg.message(),)
def sendEmail(subject,content,from_address,from_name='',to=[],cc=[],bcc=[],attachment_name='attachment',attachment=None,html_content=None): # Ensure that email address information is in list form and that there are no empty values recipients = [x for x in to + cc if x] bcc = [x for x in bcc if x] from_email = from_name + ' <' + from_address + '>' if from_address else None reply_to = [from_address,] if from_address else None logger.info('Sending email from %s to %s' % (from_address,recipients)) if getattr(settings,'DEBUG',None): logger.info('Email content:\n\n%s' % content) logger.info('Email HTML content:\n\n%s' % html_content) with get_connection() as connection: connection.open() message = EmailMultiAlternatives( subject=subject, body=content, from_email=from_email, to=recipients, bcc=bcc, reply_to=reply_to, connection=connection, ) if html_content: message.attach_alternative(html_content, "text/html") if attachment: message.attach(attachment_name, attachment) message.send(fail_silently=False) connection.close()
def bulk_email_view(self, request, object_id): season = get_object_or_404(Season, pk=object_id) if not request.user.has_perm('tournament.bulk_email', season.league): raise PermissionDenied if request.method == 'POST': form = forms.BulkEmailForm(season.seasonplayer_set.count(), request.POST) if form.is_valid() and form.cleaned_data['confirm_send']: season_players = season.seasonplayer_set.all() email_addresses = {sp.player.email for sp in season_players if sp.player.email != ''} email_messages = [] for addr in email_addresses: message = EmailMultiAlternatives( form.cleaned_data['subject'], form.cleaned_data['text_content'], settings.DEFAULT_FROM_EMAIL, [addr] ) message.attach_alternative(form.cleaned_data['html_content'], 'text/html') email_messages.append(message) conn = mail.get_connection() conn.open() conn.send_messages(email_messages) conn.close() self.message_user(request, 'Emails sent to %d players.' % len(season_players), messages.INFO) return redirect('admin:tournament_season_changelist') else: form = forms.BulkEmailForm(season.seasonplayer_set.count()) context = { 'has_permission': True, 'opts': self.model._meta, 'site_url': '/', 'original': season, 'title': 'Bulk email', 'form': form } return render(request, 'tournament/admin/bulk_email.html', context)
def handle(self, email=True, verbosity=1, **kwargs): self.verbosity = verbosity UserModel = get_user_model() exp = ExpirySettings.get() oldest = exp.earliest_possible_login if oldest is None: self._info("Account expiry not configured; nothing to do.") return self._info("Checking for users who haven't logged in since %s" % oldest) gone = UserModel.objects.filter(is_active=True, last_login__lt=oldest) for username in gone.values_list(UserModel.USERNAME_FIELD, flat=True): self._info("Deactiviting user: %s" % username) messages = list(filter(None, (self._make_email(exp, user) for user in gone))) count = gone.update(is_active=False) if count: self._info("%d account(s) expired" % count) if email and messages: self._info("Sending e-mails") connection = mail.get_connection() connection.send_messages(messages) self._info("Done")