我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用django.contrib.auth.get_user_model()。
def clean_invitees(self): self.invitee_users = [] data = self.cleaned_data['invitees'] invitees = [s.strip() for s in data.split(',')] for invitee in invitees: User = get_user_model() try: invitee_user = User.objects.get(username=invitee) self.invitee_users.append(invitee_user) except User.DoesNotExist: raise forms.ValidationError(_('There is no user "%s."') % invitee) has_invitation = bool(GroupInvitation.objects.filter( group=self.group, invitee=invitee_user)) if has_invitation: raise forms.ValidationError( _('"%s" already has an invitation.') % invitee) already_member = \ invitee_user.groups.filter(name=self.group.name).exists() if already_member: raise forms.ValidationError( _('"%s" is already a member of this group.') % invitee)
def resolve_user_or_group(self, old_id): """Resolve a user by its user id from old dudel.""" # connect to db conn = psycopg2.connect(self.conn_string) cursor = conn.cursor() cursor.execute('SELECT username FROM "user" WHERE id=%s', (old_id,)) username = cursor.fetchone() try: if username: return get_user_model().objects.get(username=username[0]) else: cursor.execute('SELECT name FROM "group" WHERE id=%s', (old_id,)) groupname = cursor.fetchone() if groupname: return Group.objects.get(name=groupname[0]) except ObjectDoesNotExist: return None
def test_switch_window(self): self.do_login(username="admin", password="password") self.get_url("admin:auth_user_change", self.user.id) self.click('#add_id_groups') old_window, new_window = self.switch_window() self.fill({'#id_name': 'My new group'}) self.switch_window(old_window) self.fill({'#id_first_name': 'My first name'}) self.switch_window(new_window) self.submit('input[name=_save]', window_closes=True) self.switch_window(old_window) self.submit('input[name=_save]') User = get_user_model() user = User.objects.get(id=self.user.id) self.assertEqual([g.name for g in user.groups.all()], ["My new group"]) self.assertEqual(user.first_name, "My first name")
def owner_search_fields(self): """ Returns all the fields that are CharFields except for password from the User model. For the built-in User model, that means username, first_name, last_name, and email. """ try: from django.contrib.auth import get_user_model except ImportError: # Django < 1.5 from django.contrib.auth.models import User else: User = get_user_model() return [ field.name for field in User._meta.fields if isinstance(field, models.CharField) and field.name != 'password' ]
def check_password(environ, username, password): """ Authenticates against Django's auth database mod_wsgi docs specify None, True, False as return value depending on whether the user exists and authenticates. """ UserModel = auth.get_user_model() # db connection state is managed similarly to the wsgi handler # as mod_wsgi may call these functions outside of a request/response cycle db.reset_queries() try: try: user = UserModel._default_manager.get_by_natural_key(username) except UserModel.DoesNotExist: return None if not user.is_active: return None return user.check_password(password) finally: db.close_old_connections()
def groups_for_user(environ, username): """ Authorizes a user based on groups """ UserModel = auth.get_user_model() db.reset_queries() try: try: user = UserModel._default_manager.get_by_natural_key(username) except UserModel.DoesNotExist: return [] if not user.is_active: return [] return [force_bytes(group.name) for group in user.groups.all()] finally: db.close_old_connections()
def user_is_registered(self, user): """Checks whether a ?User? is registered in the database. Parameters ---------- user Can be a Discord `User` object or `Member` object, or a user ID. """ if isinstance(user, discord.User) or isinstance(user, discord.Member): try: get_user_model().objects.get(id=user.id) return True except get_user_model().DoesNotExist: return False else: try: get_user_model().objects.get(id=user) return True except get_user_model().DoesNotExist: return False
def disable_admin_login(): ''' Disable admin login, but allow editing. amended from: https://stackoverflow.com/a/40008282/517560 ''' User = get_user_model() user, created = User.objects.update_or_create( id=1, defaults=dict( first_name='Default Admin', last_name='User', is_superuser=True, is_active=True, is_staff=True ) ) def no_login_has_permission(request): setattr(request, 'user', user) return True return no_login_has_permission
def blogger_required(view_func): """ Only lets bloggers to access the supplied view and redirects readers. """ def _check_blogger_or_reader(request, *args, **kwargs): UserModel = get_user_model() user_id = request.user.id user = UserModel.objects.select_related( 'profile', 'blog').prefetch_related().get(pk=user_id) # update user instance on request with related objects to save queries. request.user = user if user.profile.is_blogger(): try: if user.blog: return view_func(request, *args, **kwargs) except Blog.DoesNotExist: return redirect('blog:blog_create') else: return redirect('blog:user_blog', username=request.user.username) return wraps(view_func)(_check_blogger_or_reader)
def setUp(self): User = get_user_model() # create a user who does not want to get MyOtherNotification user = User(username='test', password='Safepass1.') user.save() usernotification = UserNotification(user=user) usernotification.save() # refresh the user self.user = User.objects.get(id=user.id) # add a notification notification = Notification(notification_class=MyOtherNotification.get_class_path()) notification.save() # disable the notification self.user.usernotification.disabled_notifications.add(notification) self.user = User.objects.get(id=user.id)
def authenticate_payload(payload): from rest_framework_sso.models import SessionToken user_model = get_user_model() if api_settings.VERIFY_SESSION_TOKEN: try: session_token = SessionToken.objects.\ active().\ select_related('user').\ get(pk=payload.get(claims.SESSION_ID), user_id=payload.get(claims.USER_ID)) user = session_token.user except SessionToken.DoesNotExist: raise exceptions.AuthenticationFailed(_('Invalid token.')) else: try: user = user_model.objects.get(pk=payload.get(claims.USER_ID)) except user_model.DoesNotExist: raise exceptions.AuthenticationFailed(_('Invalid token.')) if not user.is_active: raise exceptions.AuthenticationFailed(_('User inactive or deleted.')) return user
def reports(request): User = get_user_model() now = make_aware(datetime.now()) ctx = { 'page': 'admin-reports', 'users': map( lambda x: {'id': x.username, 'text': x.formatted_name}, User.objects.hide_meta() ), 'user_rates_form': UserRatesForm(), 'paid_task_form': PaidTaskForm(), 'now': now.strftime('%Y-%m-%d %H:%M:%S'), 'admin_report': True, 'paid_task_types': PaidTaskTypes, } return render(request, 'admin/reports.html', ctx)
def get_context_data(self, **kwargs_): User = get_user_model() language_code, project_code = split_pootle_path(self.pootle_path)[:2] offset = self.kwargs.get("offset", 0) top_scorers = User.top_scorers( project=project_code, language=language_code, limit=TOP_CONTRIBUTORS_CHUNK_SIZE + 1, offset=offset, ) return get_top_scorers_data( top_scorers, TOP_CONTRIBUTORS_CHUNK_SIZE )
def handle(self, **options): if bool(options['user']) == options['all']: raise CommandError("Either provide a 'user' to verify or " "use '--all' to verify all users") if options['all']: for user in get_user_model().objects.hide_meta(): try: utils.verify_user(user) self.stdout.write("Verified user '%s'" % user.username) except (ValueError, ValidationError) as e: self.stderr.write(e.message) if options['user']: for user in options['user']: try: utils.verify_user(self.get_user(user)) self.stdout.write("User '%s' has been verified" % user) except (ValueError, ValidationError) as e: self.stderr.write(e.message)
def get_user_by_email(email): """Retrieves auser by its email address. First it looks up the `EmailAddress` entries, and as a safety measure falls back to looking up the `User` entries (these addresses are sync'ed in theory). :param email: address of the user to look up. :return: `User` instance belonging to `email`, `None` otherwise. """ try: return EmailAddress.objects.get(email__iexact=email).user except EmailAddress.DoesNotExist: try: User = get_user_model() return User.objects.get(email__iexact=email) except User.DoesNotExist: return None
def check_users(app_configs=None, **kwargs): from django.contrib.auth import get_user_model errors = [] User = get_user_model() try: admin_user = User.objects.get(username='admin') except (User.DoesNotExist, OperationalError, ProgrammingError): pass else: if admin_user.check_password('admin'): errors.append(checks.Warning( _("The default 'admin' user still has a password set to " "'admin'."), hint=_("Remove the 'admin' user or change its password."), id="pootle.W016", )) return errors
def get(self, request): tasks = Task.objects.all().values("slug", "id") grades = [] usernames = get_user_model().objects.all().values_list( 'username', flat=True) for username in usernames: user_grades = {'username': username} task_submissions = TaskSubmission.objects \ .filter(user__username=username).values('task_id') \ .annotate(grade=Max('grade')) for grade in task_submissions: user_grades[grade['task_id']] = grade['grade'] grades.append(user_grades) context = { 'grades': grades, 'tasks': tasks } return render(request, self.template_name, context, status=200)
def clean(self): username = self.cleaned_data.get('username') password = self.cleaned_data.get('password') message = ERROR_MESSAGE if username and password: self.user_cache = authenticate( username=username, password=password) if self.user_cache is None: if u'@' in username: User = get_user_model() # Mistakenly entered e-mail address instead of username? Look it up. try: user = User.objects.get(email=username) except (User.DoesNotExist, User.MultipleObjectsReturned): # Nothing to do here, moving along. pass else: if user.check_password(password): message = _("Your e-mail address is not your username." " Try '%s' instead.") % user.username raise forms.ValidationError(message) elif not self.user_cache.is_active or not self.user_cache.is_staff: raise forms.ValidationError(message) return self.cleaned_data
def get(self, request, *args, **kwargs): """ Same as django.views.generic.edit.ProcessFormView.get(), but adds test cookie stuff """ # Get the username from a keycloak set header userid = request.META.get(settings.KEYCLOAK_USERNAME_HEADER) if userid: try: user = get_user_model().objects.get(userid=userid) except: user = get_user_model().objects.create_user( userid=userid, is_active=True) user.backend = 'django.contrib.auth.backends.ModelBackend' login(self.request, user) self.user = user return HttpResponseRedirect(self.get_success_url()) else: self.set_test_cookie() return super(LoginView, self).get(request, *args, **kwargs)
def test_user_can_have_multiple_teams_which_have_multiple_users(self): o = Organisation(name='New Org') o.save() t1 = Team(name='Team Awesome', organisation=o) t1.save() t2 = Team(name='Team Great', organisation=o) t2.save() u1 = get_user_model().objects.create_user(userid='teamplayer') u1.teams.add(t1) u1.teams.add(t2) u1.save() u2 = get_user_model().objects.create_user(userid='teamplayer2') u2.teams.add(t2) u2.save() self.assertIn(u1, t1.user_set.all()) self.assertIn(u1, t2.user_set.all()) self.assertNotIn(u2, t1.user_set.all()) self.assertIn(u2, t2.user_set.all()) self.assertEqual(len(t1.user_set.all()), 1) self.assertEqual(len(t2.user_set.all()), 2)
def create_organisation(name, num_teams=0, num_members=0, usernames={}): o = Organisation(name=name) o.save() user_global_id = 0 for x in range(0, num_teams): t = Team(name='New Team %d' % (x + 1), organisation=o) t.save() for y in range(user_global_id, num_members + user_global_id): if y in usernames.keys(): username = usernames[y] else: username = 'Team Member %d' % (y + 1) u = get_user_model().objects.create_user( userid='teammember%d' % (y + 1), name=username, ) u.teams.add(t) u.save() t.save() # Before we go to the next team, increment start ID for member name user_global_id += num_members return o
def setUp(self): # Disable Signals post_save.disconnect(post_save_diary, sender=Diary) # Run celery task synchronous app.conf.update(CELERY_ALWAYS_EAGER=True) self.test_username = TEST_USERNAME self.test_password = TEST_PASSWORD self.test_email = TEST_EMAIL # Create a user self.user = get_user_model().objects.create_user( username=self.test_username, password=self.test_password, email=self.test_email, ) # Login self.client = APIClient() self.client.login( username=self.test_username, password=self.test_password, )
def run(self, user_id): user = get_user_model().objects.get(pk=user_id) # Send email to only user who has email except for TEST_EMAIL if user.email and user.email != settings.TEST_EMAIL: send_email( sender=self.email_sender, receiver=user.email, subject=self.email_subject.format( username=user.username ), html=render_to_string( self.email_template, context={ "username": user.username, "verification_key": user.profile.verification_key, }, ), )
def test_can_run_queryset_locally(self): from django.contrib.auth import get_user_model User = get_user_model() users = [User.objects.create(username=str(i)) for i in range(10)] qs = User.objects.all() self.assertEqual(qs.count(), 10) local_qs = qs.locally().all() self.assertTrue(isinstance(local_qs, query.QuerySet)) self.assertEqual(local_qs.count(), 10) with self.assertNumQueries(0): self.assertEqual(local_qs.count(), 10) self.assertEqual(local_qs.filter(username='1').count(), 1) for user in local_qs.order_by('-username'): pass
def _notify_reviewers(self, start, end, reports): """Notify reviewers on their unverified reports.""" User = get_user_model() reviewers = User.objects.all_reviewers().filter(email__isnull=False) subject = '[Timed] Verification of reports' from_email = settings.DEFAULT_FROM_EMAIL mails = [] for reviewer in reviewers: if reports.filter(task__project__reviewers=reviewer).exists(): body = render_to_string( 'mail/notify_reviewers_unverified.txt', { # we need start and end date in system format 'start': str(start), 'end': str(end), 'reviewer': reviewer, 'protocol': settings.HOST_PROTOCOL, 'domain': settings.HOST_DOMAIN, }, using='text' ) mails.append((subject, body, from_email, [reviewer.email])) if len(mails) > 0: send_mass_mail(mails)
def authjwt_method(token): """ an authentication method using rest_framework_jwt """ import jwt from rest_framework_jwt.authentication import (jwt_decode_handler, jwt_get_username_from_payload) try: payload = jwt_decode_handler(token) except (jwt.ExpiredSignature, jwt.DecodeError, jwt.InvalidTokenError): return None User = get_user_model() username = jwt_get_username_from_payload(payload) if not username: # pragma: no cover return None try: user = User.objects.get_by_natural_key(username) except User.DoesNotExist: # pragma: no cover return None return user
def create_user(self, form, commit=True, model=None, **kwargs): User = model if User is None: User = get_user_model() user = User(**kwargs) username = form.cleaned_data.get("username") if username is None: username = self.generate_username(form) user.username = username user.email = form.cleaned_data["email"].strip() password = form.cleaned_data.get("password") if password: user.set_password(password) else: user.set_unusable_password() if commit: user.save() return user
def send_email(self, email): User = get_user_model() protocol = getattr(settings, "DEFAULT_HTTP_PROTOCOL", "http") current_site = get_current_site(self.request) email_qs = EmailAddress.objects.filter(email__iexact=email) for user in User.objects.filter(pk__in=email_qs.values("user")): uid = int_to_base36(user.id) token = self.make_token(user) password_reset_url = "{0}://{1}{2}".format( protocol, current_site.domain, reverse("account_password_reset_token", kwargs=dict(uidb36=uid, token=token)) ) ctx = { "user": user, "current_site": current_site, "password_reset_url": password_reset_url, } hookset.send_password_reset_email([user.email], ctx)
def test_user_can_login(self): user = get_user_model().objects.create_user('test', email='test', password='test') client = APIClient() response = client.post(reverse('auth-login'), {'username': 'test', 'password': 'test'}) self.assertEqual(response.status_code, status.HTTP_200_OK) auth_token = AuthToken.objects.get(user=user) data = json.loads(response.content.decode('utf-8')) token = data['token'] self.assertEqual(token, auth_token.key)