我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.web.HTTPError()。
def _read_file(self, blob, format): """Reads a non-notebook file. blob: instance of :class:`google.cloud.storage.Blob`. format: If "text", the contents will be decoded as UTF-8. If "base64", the raw bytes contents will be encoded as base64. If not specified, try to decode as UTF-8, and fall back to base64 """ bcontent = blob.download_as_string() if format is None or format == "text": # Try to interpret as unicode if format is unknown or if unicode # was explicitly requested. try: return bcontent.decode("utf8"), "text" except UnicodeError: if format == "text": raise web.HTTPError( 400, "%s is not UTF-8 encoded" % self._get_blob_path(blob), reason="bad format", ) return base64.encodebytes(bcontent).decode("ascii"), "base64"
def authorized(admin_only=False): def wrap(user_handler): @wraps(user_handler) def authorized_handler(self, *args, **kwargs): self.set_cache(is_public=False) request = self.request if request.method == 'GET': if not self.current_user_id: next_url = self.get_argument('next', '/') self.redirect(self.get_login_url() + "?next=" + next_url, status=302 if request.version == 'HTTP/1.0' else 303) elif admin_only and not self.is_admin: raise HTTPError(403) else: user_handler(self, *args, **kwargs) elif not self.current_user_id: raise HTTPError(403) elif admin_only and not self.is_admin: raise HTTPError(403) else: user_handler(self, *args, **kwargs) return authorized_handler return wrap
def put(self, operation): """Método responsável por tratar requisições PUT. Aqui funciona assim: Pode-se enviar PUT requests para ``/``, ``/putpost`` ou ``/putcomment``, sendo ``/`` ou ``/putpost`` para cadastrar um post e ``/putcomment`` para cadastrar um comentário """ if not operation: operation = 'putpost' operations = {'putpost': self.put_post, 'putcomment': self.put_comment} if operation not in operations: raise HTTPError(404) method = operations.get(operation) ret = yield method() self.write(ret)
def error_response(func): @wraps(func) def wrapper(self, *args, **kwargs): try: result = func(self, *args, **kwargs) except Exception as ex: if not isinstance(ex, (web.HTTPError, ExecutionError, GraphQLError)): tb = ''.join(traceback.format_exception(*sys.exc_info())) app_log.error('Error: {0} {1}'.format(ex, tb)) self.set_status(error_status(ex)) error_json = json_encode({'errors': error_format(ex)}) app_log.debug('error_json: %s', error_json) self.write(error_json) else: return result return wrapper
def post(self): # ???? TODO: ????? mobile = self.get_argument('mobile') pwd = self.get_argument('pwd') users = yield User.objects.filter(mobile=mobile).find_all() if len(users) != 0: # ????????? raise HTTPError(**errors.status_21) school_id = self.get_argument('school_id') self.vaildate_id(school_id) school = yield School.objects.get(self.get_argument('school_id')) self.check_none(school) # ???? user = User(mobile=mobile, password=pwd, nickname='test', school_id=ObjectId(school_id)) yield user.save() user = user.to_dict() user['token'] = token_manager.create_token(str(user['id'])) self.write_json(user)
def get_file_checkpoint(self, checkpoint_id, path): """Get the content of a checkpoint for a non-notebook file. Returns a dict of the form: { "type": "file", "content": <str>, "format": {"text","base64"}, } """ self.log.info("restoring %s from checkpoint %s", path, checkpoint_id) cp = self._get_checkpoint_path(checkpoint_id, path) exists, blob = self.parent._fetch(cp) if not exists: raise web.HTTPError(404, u"No such checkpoint: %s for %s" % ( checkpoint_id, path)) content, fmt = self.parent._read_file(blob, None) return { "type": "file", "content": content, "format": fmt }
def get_notebook_checkpoint(self, checkpoint_id, path): """Get the content of a checkpoint for a notebook. Returns a dict of the form: { "type": "notebook", "content": <output of nbformat.read>, } """ self.log.info("restoring %s from checkpoint %s", path, checkpoint_id) cp = self._get_checkpoint_path(checkpoint_id, path) exists, blob = self.parent._fetch(cp) if not exists: raise web.HTTPError(404, u"No such checkpoint: %s for %s" % ( checkpoint_id, path)) nb = self.parent._read_notebook(blob) return { "type": "notebook", "content": nb }
def _save_directory(self, path, model): """Creates a directory in GCS.""" exists, obj = self._fetch(path) if exists: if isinstance(obj, Blob): raise web.HTTPError(400, u"Not a directory: %s" % path) else: self.log.debug("Directory %r already exists", path) return bucket_name, bucket_path = self._parse_path(path) if bucket_path == "": self.client.create_bucket(bucket_name) else: bucket = self._get_bucket(bucket_name, throw=True) bucket.blob(bucket_path).upload_from_string( b"", content_type="application/x-directory")
def get(self, geneid=None): '''/gene/<geneid> geneid can be entrezgene, ensemblgene, retired entrezgene ids. /gene/1017 /gene/1017?fields=symbol,name /gene/1017?fields=symbol,name,reporter.HG-U133_Plus_2 ''' if geneid: kwargs = self.get_query_params() kwargs.setdefault('scopes', 'entrezgene,ensemblgene,retired') kwargs.setdefault('species', 'all') gene = yield Task(self.esq.get_gene2, geneid, **kwargs) if gene: self.return_json(gene) self.ga_track(event={'category': 'v2_api', 'action': 'gene_get'}) else: raise HTTPError(404) else: raise HTTPError(404)
def get(self, address, hash, format, include_body=True): format = format.upper() if format not in ['PNG', 'JPG', 'JPEG']: raise HTTPError(404) if format == 'JPG': format = 'JPEG' async with self.db: if hash is None: row = await self.db.fetchrow("SELECT * FROM avatars WHERE toshi_id = $1 AND format = $2 ORDER BY last_modified DESC", address, format) else: row = await self.db.fetchrow( "SELECT * FROM avatars WHERE toshi_id = $1 AND format = $2 AND substring(hash for {}) = $3" .format(AVATAR_URL_HASH_LENGTH), address, format, hash) if row is None or row['format'] != format: raise HTTPError(404) await self.handle_file_response(row['img'], "image/{}".format(format.lower()), row['hash'], row['last_modified'])
def get_time_permits(self, hash_mpin_id_hex, signature): # Get time permit from the local D-TA url = url_concat( "{0}/{1}".format(options.DTALocalURL.rstrip("/"), "timePermits"), { 'hash_mpin_id': hash_mpin_id_hex, 'signature': signature, 'count': random.randint(PERMITS_MIN, PERMITS_MAX) if options.cacheTimePermits else 1}) response = yield self.http_client.fetch(url) if response.error: log.error("DTA timePermit failed, URL: {0}. Code: {1}, Reason: {2}".format(url, response.code, response.reason)) raise HTTPError(500) if response.body: try: response_data = json.loads(response.body) raise tornado.gen.Return(response_data["timePermits"]) except (ValueError, KeyError): log.error("DTA /timePermit Failed. Invalid JSON response".format( response.body)) raise HTTPError(500)
def get_time_permit(self, hash_mpin_id_hex, date_epoch, signature): """Get time permit from cache or request new.""" if options.cacheTimePermits: time_permit_item = self.storage.find(time_permit_id=hash_mpin_id_hex, time_permit_date=date_epoch) if time_permit_item: # Get time permit from cache raise tornado.gen.Return(time_permit_item.time_permit) # No cached time permits for this mpin id, request new from D-TA time_permits = yield self.get_time_permits(hash_mpin_id_hex, signature) if options.cacheTimePermits: self.cache_time_permits(time_permits, hash_mpin_id_hex) # Return the one for today if str(date_epoch) not in time_permits: log.error("DTA /timePermit Failed. No time permit for today") raise HTTPError(500) raise tornado.gen.Return(time_permits[str(date_epoch)])
def get(self, mpin_id): # Check revocation status of mpin id. if options.RPAPermitUserURL: response = yield self.http_client.fetch( url_concat(options.RPAPermitUserURL, {"mpin_id": mpin_id}), raise_error=False) if response.code != 200: # RPA rejects this mpin id raise HTTPError(response.code) hash_mpin_id_hex = hashlib.sha256(mpin_id.decode("hex")).hexdigest() today_epoch = secrets.today() signature = signMessage(hash_mpin_id_hex, Keys.app_key) time_permit = yield self.get_time_permit(hash_mpin_id_hex, today_epoch, signature) self.set_header("Cache-Control", "no-cache") self.finish({ "date": today_epoch, "signature": signature, "storageId": hash_mpin_id_hex, 'message': "M-Pin Time Permit Generated", 'timePermit': time_permit, 'version': VERSION, })
def write_error(self, status_code, **kwargs): try: exc_info = kwargs.pop('exc_info') e = exc_info[1] if isinstance(e, HTTPAPIError): pass elif isinstance(e, HTTPError): e = HTTPAPIError(BAD_REQUEST_ERROR, e.log_message, e.status_code) else: e = HTTPAPIError(INTERNAL_SERVER_ERROR, str(e), 500) self.set_status(status_code) self._async_write(str(e)) except Exception as e: LOG.exception(str(e)) return super(BaseApiHandler, self).write_error(status_code, **kwargs) # private method
def proxy(self, port, path): if not path.startswith('/'): path = '/' + path # if we're in 'starting' let's wait a while for i in range(5): if not self.state.get('starting', False): break # Simple exponential backoff wait_time = max(1.4 ** i, 5) self.log.debug('Waiting {} before checking if rstudio is up'.format(wait_time)) yield gen.sleep(wait_time) else: raise web.HTTPError('could not start rsession in time', status_code=500) # FIXME: try to not start multiple processes at a time with some locking here if 'proc' not in self.state: self.log.info('No existing process rsession process found') yield self.start_process() return (yield super().proxy(self.port, path))
def get(self, name, path): if path == "": uri = self.request.path + "/" if self.request.query: uri += "?" + self.request.query self.redirect(uri, permanent=True) return self.request.path = ( path if self.request.query else "%s?%s" % (path, self.request.query)) manager = self.settings["tensorboard_manager"] if name in manager: tb_app = manager[name].tb_app WSGIContainer(tb_app)(self.request) else: raise web.HTTPError(404)
def download(self, fspath, mime_type=None): if os.path.exists(fspath): if mime_type is None: mime_type, encoding = mimetypes.guess_type(fspath) if mime_type is None: mime_type = "text/plain" base_filename = os.path.basename(fspath) self.set_header('Content-Type', mime_type) self.set_header('Content-Disposition', 'attachment; filename="%s"' % base_filename) fp = open(fspath, "rb") try: self.write(fp.read()) except: print("IO error reading: " + fspath) finally: fp.close() else: raise web.HTTPError(404)
def validate_absolute_path(self, root, absolute_path): """ Validate and return the absolute path. Credit: https://github.com/tornadoweb/tornado/blob/master/tornado/web.py """ root = os.path.abspath(root) if not root.endswith(os.path.sep): root += os.path.sep if not (absolute_path + os.path.sep).startswith(root): # Only files under the specified root can be accessed raise HTTPError(403, "%s is not in the root directory", self.path) if not os.path.exists(absolute_path): raise HTTPError(404) if not os.path.isfile(absolute_path): raise HTTPError(403, "%s is not a file", self.path) return absolute_path
def refresh_token(method): # Decorate method to refresh token time after request @functools.wraps(method) def wrapper(self, *args, **kwargs): try: now = JwtToken().validate(self.get_argument('token')) if len(now.keys()) != 0: TIME_UNUSED = 5*60*60 t = int(time.time()) + TIME_UNUSED j = JwtToken().generate({"uid": now['uid'], "type": now['type'], "time": t}) self.res['token'] = j.decode("utf-8") except: self.set_status(401) self.res['error'] = "Token Refresh Error" self.write_json() self.finish() raise HTTPError(401) return method(self, *args, **kwargs) return wrapper
def get(self, domain, file_name): try: if file_name: query_status, query_result = self.mysql_client.raw_sql_fdfs_download(file_name.strip(), domain.strip()) if query_status: self.redirect(url=query_result, permanent=False, status=None) else: # logging.error("file: %s, domain: %s , error: %s" % (file_name, domain, query_result)) raise HTTPError(404) else: raise HTTPError(404) except: raise HTTPError(404) finally: self.mysql_client.close_connetc() # pass
def post(self): '''??????''' form = StatusMoreForm(self.request.arguments) if not form.validate(): raise HTTPError(404) page = form.page.data skip = HOME_SETTINGS['status_number_per_page'] * page limit = HOME_SETTINGS['status_number_per_page'] status_list = yield StatusDocument.get_friends_status_list( self.current_user['_id'], skip=skip, limit=limit ) html = ''.join( self.render_string( 'home/template/status/status-list-item.html', status=status ) for status in status_list ) self.write_json({'html': html, 'page': page + 1})
def post(self): form = StatusCommentsForm(self.request.arguments) if not form.validate(): raise HTTPError(404) status_id = form.status_id.data status = yield StatusDocument.find_one({ '_id': ObjectId(status_id) }) if not status: raise HTTPError(404) status_comment_list = yield StatusCommentDocument.get_comment_list( status_id, self.current_user['_id'] ) html = self.render_string( 'home/template/status/status-comment-list.html', status=status, status_comment_list=status_comment_list ) self.write_json({'html': html})
def get(self): topic_id = self.get_argument('topic_id', None) if not topic_id: raise HTTPError(404) topic = yield TopicDocument.get_topic( topic_id, self.current_user['_id'] ) if not topic or topic['author']['_id'] != self.current_user['_id']: raise HTTPError(404) self.render( 'community/template/topic-new.html', action="edit", topic=topic )
def post(self): form = TopicCommentMoreForm(self.request.arguments) if not form.validate(): raise HTTPError(404) page = form.page.data topic_id = form.topic_id.data skip = COMMUNITY_SETTINGS['topic_comment_number_per_page'] * page limit = COMMUNITY_SETTINGS['topic_comment_number_per_page'] comment_list = yield TopicCommentDocument.get_comment_list( topic_id, skip, limit ) html = ''.join( self.render_string( 'community/template/topic-comment-list-item.html', comment=comment ) for comment in comment_list ) self.write_json({'html': html, 'page': page + 1})
def post(self): form = ProfileCoverSetForm(self.request.arguments) if not form.validate(): raise HTTPError(404) profile_cover_id = form.profile_cover_id.data profile_cover = yield OfficialProfileCoverDocument.find_one({ '_id': ObjectId(profile_cover_id) }) if not profile_cover: raise HTTPError(404) cover = DBRef( OfficialProfileCoverDocument.meta['collection'], ObjectId(profile_cover['_id']) ) yield UserSettingDocument.set_profile_cover( self.current_user['_id'], cover ) raise gen.Return()
def post(self): form = PrivateSetForm(self.request.arguments) if not form.validate(): raise HTTPError(404) require_verify_when_add_friend = form.require_verify_when_add_friend.data allow_stranger_visiting_profile = form.allow_stranger_visiting_profile.data allow_stranger_chat_with_me = form.allow_stranger_chat_with_me.data enable_leaving_message = form.enable_leaving_message.data yield UserSettingDocument.update( {'user': DBRef( UserDocument.meta['collection'], ObjectId(self.current_user['_id']) )}, {'$set': { 'require_verify_when_add_friend': require_verify_when_add_friend, 'allow_stranger_visiting_profile': allow_stranger_visiting_profile, 'allow_stranger_chat_with_me': allow_stranger_chat_with_me, 'enable_leaving_message': enable_leaving_message }} ) self.finish()
def post(self): form = NotificationSetForm(self.request.arguments) if not form.validate(): raise HTTPError(404) email_notify_when_offline = form.email_notify_when_offline.data yield UserSettingDocument.update( {'user': DBRef( UserDocument.meta['collection'], ObjectId(self.current_user['_id']) )}, {'$set': {'email_notify_when_offline': email_notify_when_offline}} ) self.finish()
def post(self): form = StatusMoreForm(self.request.arguments) if not form.validate(): raise HTTPError(404) page = form.page.data skip = PROFILE_SETTINGS['status_number_per_page'] * page limit = PROFILE_SETTINGS['status_number_per_page'] status_list = yield StatusDocument.get_status_list( self.current_user['_id'], self.current_user['_id'], skip=skip, limit=limit ) html = ''.join( self.render_string( 'profile/template/status/status-list-item.html', status=status ) for status in status_list ) self.write_json({'html': html, 'page': page + 1})
def post(self): form = LeaveMessageMoreForm(self.request.arguments) if not form.validate(): raise HTTPError(404) page = form.page.data user_id = form.user_id.data skip = PROFILE_SETTINGS['leave_message_number_per_page'] * page limit = PROFILE_SETTINGS['leave_message_number_per_page'] leave_message_list = yield LeaveMessageDocument.get_leave_message_list( user_id, self.current_user['_id'], skip=skip, limit=limit ) html = ''.join( self.render_string( 'profile/template/leavemessage/leavemessage-list-item.html', leave_message=leave_message ) for leave_message in leave_message_list ) self.write_json({'html': html, 'page': page + 1})
def post(self): form = FriendRequestForm(self.request.arguments) if not form.validate(): raise HTTPError(404) user_id = form.user_id.data yield MessageDocument.remove({ 'sender': DBRef( UserDocument.meta['collection'], ObjectId(user_id) ), 'recipient': DBRef( UserDocument.meta['collection'], ObjectId(self.current_user['_id']) ), 'message_type': MessageTopic.FRIEND_REQUEST_NEW})
def exception_control(func): ''' ??????? ''' @functools.wraps(func) def wrapper(self): ''' ???? ''' try: code, msg, body = E_SUCC, "OK", func(self) except (MissingArgumentError, AssertionError) as ex: code, msg, body = E_PARAM, str(ex), None except tornado.web.HTTPError: raise except Exception as ex: code, msg, body = E_INTER, str(ex), None log_msg = self.request.uri \ if self.request.files else \ "%s %s" % (self.request.uri, self.request.body) logging.error(log_msg, exc_info=True) self.send_json(body, code, msg) return wrapper
def process_module(self, module): ''' ?????? ''' module = module or '' if self.module_prefix: module = '%s/%s' % (self.module_prefix, module) module = '__'.join([i for i in module.split('/') if i]) method = getattr(self, module or 'index', None) if method and module not in ('get', 'post'): try: result = method() if result: self.send_json(result) except Exception as ex: logging.error('%s\n%s\n', self.request, str(ex), exc_info=True) self.send_json(None, E_INTER, str(ex)) else: raise tornado.web.HTTPError(404)
def put(self, section_name): if not self.settings_dir: raise web.HTTPError(404, "No current settings directory") raw = self.request.body.strip().decode(u"utf-8") # Validate the data against the schema. schema = _get_schema(self.schemas_dir, section_name, self.overrides) validator = Validator(schema) try: validator.validate(json.loads(json_minify(raw))) except ValidationError as e: raise web.HTTPError(400, str(e)) # Write the raw data (comments included) to a file. path = _path(self.settings_dir, section_name, _file_extension, True) with open(path, "w") as fid: fid.write(raw) self.set_status(204)
def _path(root_dir, section_name, file_extension = ".json", make_dirs = False): """Parse the URL section name and find the local file system path.""" parent_dir = root_dir # Attempt to parse path, e.g. @jupyterlab/apputils-extension:themes. try: package_dir, plugin = section_name.split(":") parent_dir = os.path.join(root_dir, package_dir) path = os.path.join(parent_dir, plugin + file_extension) # This is deprecated and exists to support the older URL scheme. except: path = os.path.join(root_dir, section_name + file_extension) if make_dirs and not os.path.exists(parent_dir): try: os.makedirs(parent_dir) except Exception as e: name = section_name message = "Failed writing settings ({}): {}".format(name, str(e)) raise web.HTTPError(500, message) return path
def get(self, provider_prefix, spec): try: provider = self.get_provider(provider_prefix, spec=spec) except web.HTTPError: raise except Exception as e: app_log.error("Failed to construct provider for %s/%s") # FIXME: 400 assumes it's the user's fault (?) # maybe we should catch a special InvalidSpecError here raise web.HTTPError(400, str(e)) self.render_template( "index.html", base_url=self.settings['base_url'], url=provider.get_repo_url(), ref=provider.unresolved_ref, filepath=self.get_argument('filepath', None), urlpath=self.get_argument('urlpath', None), submit=True, google_analytics_code=self.settings['google_analytics_code'], )
def put(self, bucket, object_name): object_name = urllib.unquote(object_name) bucket_dir = os.path.abspath(os.path.join( self.application.directory, bucket)) if not bucket_dir.startswith(self.application.directory) or \ not os.path.isdir(bucket_dir): raise web.HTTPError(404) path = self._object_path(bucket, object_name) if not path.startswith(bucket_dir) or os.path.isdir(path): raise web.HTTPError(403) directory = os.path.dirname(path) if not os.path.exists(directory): os.makedirs(directory) object_file = open(path, "w") object_file.write(self.request.body) object_file.close() self.finish()
def post(self): # url and filename are sent in a JSON encoded blob post_data = tornado.escape.json_decode(self.request.body) nb_url = post_data['nb_url'] nb_name = base64.b64decode(post_data['nb_name']).decode('utf-8') force_download = post_data['force_download'] file_path = os.path.join(os.getcwd(), nb_name) if os.path.isfile(file_path): if not force_download: raise HTTPError(409, "ERROR: File already exists.") response = request_session.get(nb_url, stream=True) with open(file_path, 'wb') as fd: # TODO: check if this is a good chunk size for chunk in response.iter_content(1024): fd.write(chunk) self.write(nb_name) self.flush()
def get(self): keywords = self.get_argument('keywords', None) if keywords: article_ids = KeywordArticle.query_by_keyword(keywords) if article_ids: articles = Article.get_by_ids(article_ids, public_only=True) article_ids = [article.id for article in articles] hit_counts = ArticleHitCount.get_by_ids(article_ids) replies_dict = ArticleComments.get_comment_count_of_articles(article_ids) else: articles = [] hit_counts = replies_dict = {} self.set_cache(CONFIG.DEFAULT_CACHE_TIME, is_public=True) self.render('web/search.html', { 'title': u'???%s?' % keywords, 'page': 'search', 'keywords': keywords, 'articles': articles, 'hit_counts': hit_counts, 'replies_dict': replies_dict }) else: raise HTTPError(400)
def get(self, tag_name): if not Tag.exists(tag_name): raise HTTPError(404) articles, next_cursor = TagArticle.get_articles(tag_name, self.cursor) if articles: article_ids = [article.id for article in articles] hit_counts = ArticleHitCount.get_by_ids(article_ids) replies_dict = ArticleComments.get_comment_count_of_articles(article_ids) else: hit_counts = replies_dict = {} next_cursor = None self.set_cache(CONFIG.DEFAULT_CACHE_TIME, is_public=True) self.render('web/tag_articles.html', { 'title': u'???%s?' % tag_name, 'page': 'tag_articles', 'next_cursor': next_cursor, 'tag_name': tag_name, 'articles': articles, 'hit_counts': hit_counts, 'replies_dict': replies_dict })
def get(self, category_name): if not Category.exists(category_name): raise HTTPError(404) articles, next_cursor = CategoryArticles.get_articles(category_name, self.cursor) if articles: article_ids = [article.id for article in articles] hit_counts = ArticleHitCount.get_by_ids(article_ids) replies_dict = ArticleComments.get_comment_count_of_articles(article_ids) else: hit_counts = replies_dict = {} next_cursor = None self.set_cache(CONFIG.DEFAULT_CACHE_TIME, is_public=True) self.render('web/category_articles.html', { 'title': u'???%s?' % category_name, 'page': 'category_articles', 'next_cursor': next_cursor, 'category_name': category_name, 'articles': articles, 'hit_counts': hit_counts, 'replies_dict': replies_dict })
def prepare(self): super(AdminHandler, self).prepare() self.set_cache(is_public=False) if CONFIG.ENABLE_HTTPS and not self.is_https: request = self.request if request.version == 'HTTP/1.0': if request.method in ('GET', 'HEAD'): self.redirect('https://%s%s' % (request.host, request.uri)) else: raise HTTPError(403) else: self.redirect('https://%s%s' % (request.host, request.uri), status=307) return if not self.is_admin: if not self.current_user_id: request = self.request if request.method in ('GET', 'HEAD'): state = Auth.generate(request.uri) self.set_state_cookie(state) self.redirect(self.get_login_url(), status=302 if request.version == 'HTTP/1.0' else 303) return self.set_session_time_cookie() # force check user status raise HTTPError(403)
def authorized(admin_only=False): def wrap(user_handler): @wraps(user_handler) def authorized_handler(self, *args, **kwargs): self.set_cache(is_public=False) request = self.request if request.method == 'GET': if not self.current_user_id: state = Auth.generate(request.uri) self.set_state_cookie(state) self.redirect(self.get_login_url(), status=302 if request.version == 'HTTP/1.0' else 303) elif admin_only and not self.is_admin: raise HTTPError(403) else: user_handler(self, *args, **kwargs) elif not self.current_user_id: self.set_session_time_cookie() # force check user status raise HTTPError(403) elif admin_only and not self.is_admin: raise HTTPError(403) else: user_handler(self, *args, **kwargs) return authorized_handler return wrap
def get(self, article_id): article_id = int(article_id) if not article_id: raise HTTPError(404) article = Article.get_by_id(article_id) if not article: raise HTTPError(404) categories = Category.get_all_names_with_paths() tags = Tag.get_all() self.render('admin/edit_article.html', { 'title': u'???%s?' % article.title, 'page': 'edit_article', 'article': article, 'categories': categories, 'tags': sorted(tags) })
def session(self): session_id = self.get_secure_cookie('session_id') hmac_key = self.get_secure_cookie('verification') if session_id and hmac_key: check_hmac = self._generate_hmac(session_id) if hmac_key != check_hmac: raise HTTPError(403) else: session_id = self._generate_id() hmac_key = self._generate_hmac(session_id) self.set_secure_cookie('session_id', session_id) self.set_secure_cookie('verification', hmac_key) return session_id
def get(self, operation): if not operation: operation = 'get' operations = {'get': self.get_object, 'list': self.list_objects} if operation not in operations.keys(): raise HTTPError(404) method = operation.get(operation) ret = yield method() self.write(ret)
def put(self, operation): if not operation: operation = 'putpost' operations = {'putpost': self.put_post, 'putcomment': self.put_comment} if operation not in operations: raise HTTPError(404) method = operations.get(operation) ret = yield method() self.write(ret)
def get(self, operation): if not operation: operation = 'get' operations = {'get': self.get_object, 'list': self.list_objects} if operation not in operations: raise HTTPError(404) method = operations.get(operation) ret = yield method() self.write(ret)
def error_status(exception): if isinstance(exception, web.HTTPError): return exception.status_code elif isinstance(exception, (ExecutionError, GraphQLError)): return 400 else: return 500
def error_format(exception): if isinstance(exception, ExecutionError): return [{'message': e} for e in exception.errors] elif isinstance(exception, GraphQLError): return [format_graphql_error(exception)] elif isinstance(exception, web.HTTPError): return [{'message': exception.log_message, 'reason': exception.reason}] else: return [{'message': 'Unknown server error'}]