我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用google.appengine.api.users.is_current_user_admin()。
def check_credentials(request, other_application='admin', expiration=60 * 60, gae_login=True): """Checks that user is authorized to access other_application""" if request.env.web2py_runtime_gae: from google.appengine.api import users if users.is_current_user_admin(): return True elif gae_login: login_html = '<a href="%s">Sign in with your google account</a>.' \ % users.create_login_url(request.env.path_info) raise HTTP(200, '<html><body>%s</body></html>' % login_html) else: return False else: t0 = time.time() dt = t0 - expiration s = get_session(request, other_application) r = (s.authorized and s.last_time and s.last_time > dt) if r: s.last_time = t0 set_session(request, s, other_application) return r
def retrieve_user_from_gae(gae_user): auth_id = 'federated_%s' % gae_user.user_id() user_db = model.User.get_by('auth_ids', auth_id) if user_db: if not user_db.admin and users.is_current_user_admin(): user_db.admin = True user_db.put() return user_db return auth.create_user_db( auth_id=auth_id, name=util.create_name_from_email(gae_user.email()), username=gae_user.email(), email=gae_user.email(), verified=True, admin=users.is_current_user_admin(), )
def main(): """CGI-style request handler to dump the configuration. Put this in your app.yaml to enable (you can pick any URL): - url: /lib_config script: $PYTHON_LIB/google/appengine/api/lib_config.py Note: unless you are using the SDK, you must be admin. """ if not os.getenv('SERVER_SOFTWARE', '').startswith('Dev'): from google.appengine.api import users if not users.is_current_user_admin(): if users.get_current_user() is None: print 'Status: 302' print 'Location:', users.create_login_url(os.getenv('PATH_INFO', '')) else: print 'Status: 403' print print 'Forbidden' return print 'Content-type: text/plain' print _default_registry._dump()
def get_djangouser_for_user(cls, user): django_user = cls.all().filter('user =', user).get() if django_user: if getattr(settings, 'AUTH_ADMIN_USER_AS_SUPERUSER', True): is_admin = users.is_current_user_admin() if django_user.is_staff != is_admin or \ django_user.is_superuser != is_admin: django_user.is_superuser = django_user.is_staff = is_admin django_user.put() else: django_user = cls.create_djangouser_for_user(user) django_user.is_active = True if getattr(settings, 'AUTH_ADMIN_USER_AS_SUPERUSER', True) and \ users.is_current_user_admin(): django_user.is_staff = True django_user.is_superuser = True django_user.put() return django_user
def sys_processor(): def _get_app_var(key): if key in constants.APPVAR_DISPLAY_LIST: return models.ApplicationVariable.get_app_var(key) return None def _is_logged_in(): if users.get_current_user(): return True return False def _is_admin(): return users.is_current_user_admin() def _login_link(endpoint): return users.create_login_url(endpoint) return dict(get_app_var=_get_app_var, is_logged_in=_is_logged_in, is_admin=_is_admin, login_link=_login_link)
def getorcreate(user): userprefs = memcache.get(user.user_id()) if userprefs is not None: return userprefs else: userprefs = UserPrefs.get_by_id(user.user_id()) # new records have user_id as id if userprefs != None: userprefs.updateMemcache() return userprefs usersresult = UserPrefs.query(UserPrefs.userid == user.user_id()).fetch() # Fetching old records from userid if len(usersresult) == 0: userprefs = UserPrefs.create(user, users.is_current_user_admin(), users.is_current_user_admin()) userprefs.put() else: olduser = usersresult[0] # old record, update to a new with user_id as id and email if olduser != None: userprefs = UserPrefs.create(user, olduser.hasAccess(), olduser.isAdmin()) userprefs.activeSemester = olduser.activeSemester userprefs.groupaccess = olduser.groupaccess userprefs.groupadmin = olduser.groupadmin userprefs.put() olduser.key.delete() userprefs.updateMemcache() return userprefs
def retrieve_user_from_google(google_user): auth_id = 'federated_%s' % google_user.user_id() user_db = model.User.get_by('auth_ids', auth_id) if user_db: if not user_db.admin and users.is_current_user_admin(): user_db.admin = True user_db.put() return user_db return auth.create_or_get_user_db( auth_id=auth_id, name=util.create_name_from_email(google_user.email()), username=google_user.email(), email=google_user.email(), verified=True, admin=users.is_current_user_admin(), )
def check_credentials(request, other_application='admin', expiration=60 * 60, gae_login=True): """Checks that user is authorized to access other_application""" if request.env.web2py_runtime_gae: from google.appengine.api import users if users.is_current_user_admin(): return True elif gae_login: login_html = '<a href="%s">Sign in with your google account</a>.' \ % users.create_login_url(request.env.path_info) raise HTTP(200, '<html><body>%s</body></html>' % login_html) else: return False else: t0 = time.time() dt = t0 - expiration s = get_session(request, other_application) r = (s.authorized and s.last_time and s.last_time > dt) if r: s.last_time = t0 set_session(request,s,other_application) return r
def admin_required(func): @wraps(func) def decorated_view(*args, **kwargs): if users.get_current_user(): if not users.is_current_user_admin(): abort(401) # Unauthorized return func(*args, **kwargs) return redirect(users.create_login_url(request.url)) return decorated_view
def is_admin(): return users.get_current_user() and users.is_current_user_admin()
def admin_required(handler_method): """A decorator to require that a user be an admin for this application to access a handler. To use it, decorate your get() method like this:: @admin_required def get(self): user = users.get_current_user(self) self.response.out.write('Hello, ' + user.nickname()) We will redirect to a login page if the user is not logged in. We always redirect to the request URI, and Google Accounts only redirects back as a GET request, so this should not be used for POSTs. """ def check_admin(self, *args, **kwargs): if self.request.method != 'GET': self.abort(400, detail='The admin_required decorator ' 'can only be used for GET requests.') user = users.get_current_user() if not user: return self.redirect(users.create_login_url(self.request.url)) elif not users.is_current_user_admin(): self.abort(403) else: handler_method(self, *args, **kwargs) return check_admin
def is_user_app_admin(): logging.debug("is_user_app_admin: %s", users.is_current_user_admin()) return users.is_current_user_admin()
def get_current_user(self): user = users.get_current_user() if user: user.administrator = users.is_current_user_admin() return user
def login(self, skey="", *args, **kwargs): if users.get_current_user(): addSkel = skeletonByKind(self.userModule.addSkel().kindName) # Ensure that we have the full skeleton currentUser = users.get_current_user() uid = currentUser.user_id() userSkel = addSkel().all().filter("uid =", uid).getSkel() if not userSkel: # We'll try again - checking if there's already an user with that email userSkel = addSkel().all().filter("name.idx =", currentUser.email().lower()).getSkel() if not userSkel: # Still no luck - it's a completely new user if not self.registrationEnabled and not users.is_current_user_admin(): # Registration is disabled, it's a new user and that user is not admin logging.warning("Denying registration of %s", currentUser.email()) raise errors.Forbidden("Registration for new users is disabled") userSkel = addSkel() # We'll add a new user userSkel["uid"] = uid userSkel["name"] = currentUser.email() isAdd = True else: isAdd = False now = datetime.datetime.now() if isAdd or (now-userSkel["lastlogin"]) > datetime.timedelta(minutes=30): # Conserve DB-Writes: Update the user max once in 30 Minutes userSkel["lastlogin"] = now if users.is_current_user_admin(): if not userSkel["access"]: userSkel["access"] = [] if not "root" in userSkel["access"]: userSkel["access"].append("root") userSkel["gaeadmin"] = True else: userSkel["gaeadmin"] = False assert userSkel.toDB() return self.userModule.continueAuthenticationFlow(self, userSkel["key"]) raise errors.Redirect( users.create_login_url( self.modulePath+"/login") )
def canCall( self ): """ Checks wherever the current user can execute this task @returns bool """ return( users.is_current_user_admin() )
def register(flask_request): gae_user = users.get_current_user() if not gae_user: raise errors.LoginError( 'Cannot register if not logged into AppEngine.') data = flask_request.get_json() user = controllers.register_user( gae_user.email(), data['nick'], '', data.get('team_id'), data.get('team_name'), data.get('team_code')) if users.is_current_user_admin(): user.promote() return user
def get(self): user = users.get_current_user() if user: if users.is_current_user_admin(): self.response.write('You are an administrator.') else: self.response.write('You are not an administrator.') else: self.response.write('You are not logged in.')
def export(): user = users.get_current_user() total_shared = Session.query(Session.shared == True).count() if user and users.is_current_user_admin(): bucket_size = max(1, total_shared // (NUM_TASKS - 1)) for i in range(NUM_TASKS): # start a task with delay of 60*i seconds taskqueue.add(url='/tasks/process_export', method='GET', params={'bucket': i, 'bucket_size': bucket_size}, countdown=60*i) return 'Trigerred for %d queries' % total_shared, 200 else: return 'Admin access only', 403
def CheckIsAdmin(self): user_is_authorized = False if users.is_current_user_admin(): user_is_authorized = True if not user_is_authorized and config.CUSTOM_ENVIRONMENT_AUTHENTICATION: if len(config.CUSTOM_ENVIRONMENT_AUTHENTICATION) == 2: var, values = config.CUSTOM_ENVIRONMENT_AUTHENTICATION if os.getenv(var) in values: user_is_authorized = True else: logging.warning('remoteapi_CUSTOM_ENVIRONMENT_AUTHENTICATION is ' 'configured incorrectly.') if not user_is_authorized: try: user_is_authorized = ( oauth.is_current_user_admin(_scope=self.OAUTH_SCOPES)) except oauth.OAuthRequestError: pass if not user_is_authorized: self.response.set_status(401) self.response.out.write( 'You must be logged in as an administrator to access this.') self.response.headers['Content-Type'] = 'text/plain' return False if 'X-appcfg-api-version' not in self.request.headers: self.response.set_status(403) self.response.out.write('This request did not contain a necessary header') self.response.headers['Content-Type'] = 'text/plain' return False return True
def __call__(self, environ, start_response): if not environ.get('SERVER_SOFTWARE', '').startswith('Dev'): if not users.is_current_user_admin(): if users.get_current_user() is None: start_response('302 Found', [('Location', users.create_login_url(os.getenv('PATH_INFO', '')))]) return [] else: start_response('403 Forbidden', []) return ['Forbidden\n'] return self._application(environ, start_response)
def get(self): if users.is_current_user_admin(): self.generate('interactive.html') else: logging.warning( 'Non admin user from IP %s attempted to use interactive console', self.request.remote_addr) self.error(404)
def post(self): if users.is_current_user_admin(): if self.interactive_console_enabled(): save_stdout = sys.stdout results_io = cStringIO.StringIO() try: sys.stdout = results_io code = self.request.get('code') code = code.replace('\r\n', '\n') try: compiled_code = compile(code, '<string>', 'exec') exec(compiled_code, globals()) except Exception, e: traceback.print_exc(file=results_io) finally: sys.stdout = save_stdout results = results_io.getvalue() else: results = """The interactive console has been disabled for security because the dev_appserver is listening on a non-default address. If you would like to re-enable the console, invoke dev_appserver with the --enable_console argument. See https://developers.google.com/appengine/docs/python/tools/devserver#The_Interactive_Console for more information.""" self.generate('interactive-output.html', {'output': results}) else: logging.warning( 'Non admin user from IP %s attempted to use interactive console', self.request.remote_addr) self.error(404)
def is_server_admin(self): return users.is_current_user_admin()
def auth_user(fn): """ Decorator to force user to be logged in with GAE """ @functools.wraps(fn) def _wrapped(request, *args, **kwargs): temp_request = request bearer = request.META['HTTP_AUTHORIZATION'] url = "https://www.googleapis.com/userinfo/v2/me" result = urlfetch.fetch(url=url, method=urlfetch.GET, headers={"Authorization" : bearer}) contents = json.loads(result.content) gae_user = users.get_current_user() is_admin = users.is_current_user_admin() User = get_user_model() django_user = None try: logging.debug("Getting django user") django_user = User.objects.get( email=contents['email']) except User.DoesNotExist: logging.info("User does not exist in Montage. Checking pending users") try: pending_user = PendingUser.objects.get( email=contents['email']) except PendingUser.DoesNotExist: logging.info("No pending user record for this email") user, created = get_user_model().objects.get_or_create( email=email, defaults={ 'username': email.split('@')[0], 'is_active': True } ) return user else: logging.info("Pending user record found. Activating user.") django_user = activate_pending_user( pending_user, gae_user, is_admin) except AttributeError: return HttpResponseForbidden() else: logging.info("User found. Updating gaia_id and superuser status") request = temp_request # update_user(django_user, is_admin) if django_user: request.user = django_user else: return HttpResponseForbidden() return fn(request, *args, **kwargs) return _wrapped