我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bottle.HTTPError()。
def login_required(perm=None): def _dec(func): def _view(*args, **kwargs): s = request.environ.get('beaker.session') if s.get("name", None) and s.get("authenticated", False): if perm: perms = parse_permissions(s) if perm not in perms or not perms[perm]: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return HTTPError(403, "Forbidden") else: return redirect("/nopermission") return func(*args, **kwargs) else: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return HTTPError(403, "Forbidden") else: return redirect("/login") return _view return _dec
def enforce_api_rate_limit(action, limit_in_window): """Enforces API rate limit. Args: action: Action name. limit_in_window: Maximum number of requests of this action in a window. Raises: bottle.HTTPError: If the rate limit is exceeded. """ if (FLAGS.enable_load_test_hacks and bottle.request.headers.get('X-Load-Test', '') == 'yes'): return if get_current_user()['organizer']: return username = get_current_username() if (model.record_last_api_access_time(username) < FLAGS.api_rate_limit_request_interval): bottle.abort(429, 'Rate limit exceeded (per-second limit).') count = model.increment_api_rate_limit_counter(username, action) if count > limit_in_window: bottle.abort(429, 'Rate limit exceeded (per-hour limit).') # http://flask.pocoo.org/snippets/44/
def addcrypted(): package = request.forms.get('referer', 'ClickAndLoad Package') dlc = request.forms['crypted'].replace(" ", "+") dlc_path = join(DL_ROOT, package.replace("/", "").replace("\\", "").replace(":", "") + ".dlc") dlc_file = open(dlc_path, "wb") dlc_file.write(dlc) dlc_file.close() try: PYLOAD.addPackage(package, [dlc_path], 0) except: return HTTPError() else: return "success\r\n"
def links(): try: links = [toDict(x) for x in PYLOAD.statusDownloads()] ids = [] for link in links: ids.append(link['fid']) if link['status'] == 12: link['info'] = "%s @ %s/s" % (link['format_eta'], formatSize(link['speed'])) elif link['status'] == 5: link['percent'] = 0 link['size'] = 0 link['bleft'] = 0 link['info'] = _("waiting %s") % link['format_wait'] else: link['info'] = "" data = {'links': links, 'ids': ids} return data except Exception, e: print_exc() return HTTPError()
def test_dispatch_error(self, app, view_render_errors): """Dispatch raises an HTTPError""" @app.routecv class ExampleView(ExampleErrorView): render_errors = view_render_errors renderer_classes = [JSONRenderer] resp = app.webtest.get('/error', expect_errors=True) if view_render_errors: assert resp.status == '418 Teapot' assert resp.body == b'[1, 2, 3]' assert resp.headers['Content-Type'] == 'application/json' else: assert resp.status == '418 Teapot' assert b'DOCTYPE HTML PUBLIC' in resp.body assert resp.headers['Content-Type'] == 'text/html; charset=UTF-8'
def tasks_view(task_id): response = {} task = db.view_task(task_id, details=True) if task: entry = task.to_dict() entry["guest"] = {} if task.guest: entry["guest"] = task.guest.to_dict() entry["errors"] = [] for error in task.errors: entry["errors"].append(error.message) entry["sample"] = {} if task.sample_id: sample = db.view_sample(task.sample_id) entry["sample"] = sample.to_dict() response["task"] = entry else: return HTTPError(404, "Task not found") return jsonize(response)
def task_screenshots(task=0, screenshot=None): folder_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task), "shots") if os.path.exists(folder_path): if screenshot: screenshot_name = "{0}.jpg".format(screenshot) screenshot_path = os.path.join(folder_path, screenshot_name) if os.path.exists(screenshot_path): # TODO: Add content disposition. response.content_type = "image/jpeg" return open(screenshot_path, "rb").read() else: return HTTPError(404, screenshot_path) else: zip_data = StringIO() with ZipFile(zip_data, "w", ZIP_STORED) as zip_file: for shot_name in os.listdir(folder_path): zip_file.write(os.path.join(folder_path, shot_name), shot_name) # TODO: Add content disposition. response.content_type = "application/zip" return zip_data.getvalue() else: return HTTPError(404, folder_path)
def get_files(task_id): if not task_id.isdigit(): return HTTPError(code=404, output="The specified ID is invalid") files_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files") zip_file = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files.zip") with zipfile.ZipFile(zip_file, "w", compression=zipfile.ZIP_DEFLATED) as archive: root_len = len(os.path.abspath(files_path)) for root, dirs, files in os.walk(files_path): archive_root = os.path.abspath(root)[root_len:] for f in files: fullpath = os.path.join(root, f) archive_name = os.path.join(archive_root, f) archive.write(fullpath, archive_name, zipfile.ZIP_DEFLATED) if not os.path.exists(files_path): return HTTPError(code=404, output="Files not found") response.content_type = "application/zip" response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_%s(not_encrypted).zip" % (task_id)) return open(zip_file, "rb").read()
def audio_random(db): audio = random.choice(list(db.query(Page).filter_by(type=2).limit(10))) if audio: return json.dumps({ 'id': audio.id, 'title': audio.title, 'cover': audio.cover, 'para': audio.para, 'type': audio.type, 'tag': audio.tag, 'author': { 'id': audio.author.id, 'name': audio.author.name }, 'time': audio.time }) return HTTPError(404, 'id not found.')
def native_latest_news(db): all = db.query(Page).filter_by(type=10).order_by(desc(Page.id)).limit(10) if all: return json.dumps([{ 'id': page.id, 'key': page.id, 'title': page.title, 'cover': page.cover, 'type': page.type, 'tag': page.tag, 'author': { 'id': page.author.id, 'name': page.author.name }, 'time': page.time } for page in all]) return HTTPError(404, 'id not found.')
def latest_news(db): all = db.query(Page).filter_by(type=0).order_by(desc(Page.id)).limit(10) if all: return json.dumps([{ 'id': page.id, 'key': page.id, 'title': page.title, 'cover': page.cover, 'type': page.type, 'tag': page.tag, 'author': { 'id': page.author.id, 'name': page.author.name }, 'time': page.time } for page in all]) return HTTPError(404, 'id not found.')
def anchor_news(anchor, db): all = db.query(Page).filter_by(type=0).order_by(desc(Page.id)).filter(Page.id < anchor).limit(10) if all: return json.dumps([{ 'id': page.id, 'key': page.id, 'title': page.title, 'cover': page.cover, 'type': page.type, 'tag': page.tag, 'author': { 'id': page.author.id, 'name': page.author.name }, 'time': page.time } for page in all]) return HTTPError(404, 'id not found.')
def ensure_admin(): """Ensures the client is an admin. Raises: HTTPError: If the client is not authenticated as an admin. """ if not is_admin(): response = bottle.HTTPError(401, 'Unauthorized') response.headers['WWW-Authenticate'] = 'Basic realm="admin only" domain="/"' raise response
def json_api_handler(handler): """Bottle handler decorator for JSON REST API handlers. Args: handler: A Bottle handler function. Returns: A wrapped Bottle handler function. """ @functools.wraps(handler) def wrapped_handler(*args, **kwargs): try: handler_result = handler(*args, **kwargs) except bottle.HTTPResponse as handler_result: pass except Exception: logging.exception('Uncaught exception') handler_result = bottle.HTTPError(500, 'Internal Server Error') if isinstance(handler_result, bottle.HTTPResponse): # For now, we do not support raising successful HTTPResponse. assert handler_result.status_code // 100 != 2 # Forcibly convert HTTPError to HTTPResponse to avoid formatting. response = handler_result.copy(cls=bottle.HTTPResponse) response_data = {'ok': False, 'error': handler_result.body} else: assert isinstance(handler_result, dict) response = bottle.response.copy(cls=bottle.HTTPResponse) response_data = handler_result response_data['ok'] = True response.body = ujson.dumps(response_data, double_precision=6) response.content_type = 'application/json' return response return wrapped_handler
def secret(): if not session.user: err = bottle.HTTPError(401, "Login required") err.add_header('WWW-Authenticate', 'Basic realm="%s"' % 'private') return err return 'Welcome %s! <a href="/">Go back</a> (note that there is no reliable way to logout using basic auth)' % session.user
def secret(): if not session.user: err = bottle.HTTPError(401, "Login required") err.add_header('WWW-Authenticate', 'Basic realm="%s"' % 'private') return err return 'You are authenticated as ' + str(session.user)
def index(): if can.session.user: return "Hi " + str(can.session.user) + "!"; else: err = bottle.HTTPError(401, "Login required") err.add_header('WWW-Authenticate', 'Basic realm="%s"' % 'private') return err
def get_certificate(): username = post_get('username') if not username: raise bottle.HTTPError(500, "Username missing") password = post_get('password', default=None) days = int(post_get('days', default=None)) cert_gen = CertificateGenerator() cert_gen.generate(password, username, days) openvpn_config = cert_gen.get_openvpn_config() headers = { 'Content-Type': 'text/plain;charset=UTF-8', 'Content-Disposition': 'attachment; filename="softfire-vpn_%s.ovpn"' % username, "Content-Length": len(openvpn_config) } return bottle.HTTPResponse(openvpn_config, 200, **headers)
def server_static(filename): """ route to the css and static files""" if ".." in filename: return HTTPError(status=403) return bottle.static_file(filename, root='%s/static' % get_config('api', 'view-path', '/etc/softfire/views')) ######### # Utils # #########
def call_api(func, args=""): response.headers.replace("Content-type", "application/json") response.headers.append("Cache-Control", "no-cache, must-revalidate") s = request.environ.get('beaker.session') if 'session' in request.POST: s = s.get_by_id(request.POST['session']) if not s or not s.get("authenticated", False): return HTTPError(403, json.dumps("Forbidden")) if not PYLOAD.isAuthorized(func, {"role": s["role"], "permission": s["perms"]}): return HTTPError(401, json.dumps("Unauthorized")) args = args.split("/")[1:] kwargs = {} for x, y in chain(request.GET.iteritems(), request.POST.iteritems()): if x == "session": continue kwargs[x] = unquote(y) try: return callApi(func, *args, **kwargs) except Exception, e: print_exc() return HTTPError(500, json.dumps({"error": e.message, "traceback": format_exc()}))
def callApi(func, *args, **kwargs): if not hasattr(PYLOAD.EXTERNAL, func) or func.startswith("_"): print "Invalid API call", func return HTTPError(404, json.dumps("Not Found")) result = getattr(PYLOAD, func)(*[literal_eval(x) for x in args], **dict([(x, literal_eval(y)) for x, y in kwargs.iteritems()])) # null is invalid json response if result is None: result = True return json.dumps(result, cls=TBaseEncoder) #post -> username, password
def local_check(function): def _view(*args, **kwargs): if request.environ.get('REMOTE_ADDR', "0") in ('127.0.0.1', 'localhost') \ or request.environ.get('HTTP_HOST','0') == '127.0.0.1:9666': return function(*args, **kwargs) else: return HTTPError(403, "Forbidden") return _view
def flashgot(): if request.environ['HTTP_REFERER'] != "http://localhost:9666/flashgot" and request.environ['HTTP_REFERER'] != "http://127.0.0.1:9666/flashgot": return HTTPError() autostart = int(request.forms.get('autostart', 0)) package = request.forms.get('package', None) urls = filter(lambda x: x != "", request.forms['urls'].split("\n")) folder = request.forms.get('dir', None) if package: PYLOAD.addPackage(package, urls, autostart) else: PYLOAD.generateAndAddPackages(urls, autostart) return ""
def packages(): print "/json/packages" try: data = PYLOAD.getQueue() for package in data: package['links'] = [] for file in PYLOAD.get_package_files(package['id']): package['links'].append(PYLOAD.get_file_info(file)) return data except: return HTTPError()
def package(id): try: data = toDict(PYLOAD.getPackageData(id)) data["links"] = [toDict(x) for x in data["links"]] for pyfile in data["links"]: if pyfile["status"] == 0: pyfile["icon"] = "status_finished.png" elif pyfile["status"] in (2, 3): pyfile["icon"] = "status_queue.png" elif pyfile["status"] in (9, 1): pyfile["icon"] = "status_offline.png" elif pyfile["status"] == 5: pyfile["icon"] = "status_waiting.png" elif pyfile["status"] == 8: pyfile["icon"] = "status_failed.png" elif pyfile["status"] == 4: pyfile["icon"] = "arrow_right.png" elif pyfile["status"] in (11, 13): pyfile["icon"] = "status_proc.png" else: pyfile["icon"] = "status_downloading.png" tmp = data["links"] tmp.sort(key=get_sort_key) data["links"] = tmp return data except: print_exc() return HTTPError()
def package_order(ids): try: pid, pos = ids.split("|") PYLOAD.orderPackage(int(pid), int(pos)) return {"response": "success"} except: return HTTPError()
def abort_link(id): try: PYLOAD.stopDownloads([id]) return {"response": "success"} except: return HTTPError()
def move_package(dest, id): try: PYLOAD.movePackage(dest, id) return {"response": "success"} except: return HTTPError()
def edit_package(): try: id = int(request.forms.get("pack_id")) data = {"name": request.forms.get("pack_name").decode("utf8", "ignore"), "folder": request.forms.get("pack_folder").decode("utf8", "ignore"), "password": request.forms.get("pack_pws").decode("utf8", "ignore")} PYLOAD.setPackageData(id, data) return {"response": "success"} except: return HTTPError()
def change_password(): user = request.POST["user_login"] oldpw = request.POST["login_current_password"] newpw = request.POST["login_new_password"] if not PYLOAD.changePassword(user, oldpw, newpw): print "Wrong password" return HTTPError()
def js_dynamic(path): response.headers['Expires'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(time.time() + 60 * 60 * 24 * 2)) response.headers['Cache-control'] = "public" response.headers['Content-Type'] = "text/javascript; charset=UTF-8" try: # static files are not rendered if "static" not in path and "mootools" not in path: t = env.get_template("js/%s" % path) return t.render() else: return static_file(path, root=join(PROJECT_DIR, "media", "js")) except: return HTTPError(404, "Not Found")
def test_chunked_not_terminated(self): self._test_chunked('1\r\nx\r\n', HTTPError)
def test_chunked_wrong_size(self): self._test_chunked('2\r\nx\r\n', HTTPError)
def test_chunked_illegal_size(self): self._test_chunked('x\r\nx\r\n', HTTPError)
def test_chunked_not_chunked_at_all(self): self._test_chunked('abcdef', HTTPError)
def test_json_tobig(self): """ Environ: Request.json property with huge body. """ test = dict(a=5, tobig='x' * bottle.BaseRequest.MEMFILE_MAX) e = {'CONTENT_TYPE': 'application/json'} wsgiref.util.setup_testing_defaults(e) e['wsgi.input'].write(tob(json_dumps(test))) e['wsgi.input'].seek(0) e['CONTENT_LENGTH'] = str(len(json_dumps(test))) self.assertRaises(HTTPError, lambda: BaseRequest(e).json)
def testBasic(self): self.assertMatches('/static', '/static') self.assertMatches('/\\:its/:#.+#/:test/:name#[a-z]+#/', '/:its/a/cruel/world/', test='cruel', name='world') self.assertMatches('/:test', '/test', test='test') # No tail self.assertMatches(':test/', 'test/', test='test') # No head self.assertMatches('/:test/', '/test/', test='test') # Middle self.assertMatches(':test', 'test', test='test') # Full wildcard self.assertMatches('/:#anon#/match', '/anon/match') # Anon wildcards self.assertRaises(bottle.HTTPError, self.match, '//no/m/at/ch/')
def testNewSyntax(self): self.assertMatches('/static', '/static') self.assertMatches('/\\<its>/<:re:.+>/<test>/<name:re:[a-z]+>/', '/<its>/a/cruel/world/', test='cruel', name='world') self.assertMatches('/<test>', '/test', test='test') # No tail self.assertMatches('<test>/', 'test/', test='test') # No head self.assertMatches('/<test>/', '/test/', test='test') # Middle self.assertMatches('<test>', 'test', test='test') # Full wildcard self.assertMatches('/<:re:anon>/match', '/anon/match') # Anon wildcards self.assertRaises(bottle.HTTPError, self.match, '//no/m/at/ch/')
def testValueErrorInFilter(self): self.r.add_filter('test', lambda x: ('.*', int, int)) self.assertMatches('/int/<i:test>', '/int/5', i=5) # No tail self.assertRaises(bottle.HTTPError, self.match, '/int/noint')
def testIntFilter(self): self.assertMatches('/object/<id:int>', '/object/567', id=567) self.assertRaises(bottle.HTTPError, self.match, '/object/abc')
def testFloatFilter(self): self.assertMatches('/object/<id:float>', '/object/1', id=1) self.assertMatches('/object/<id:float>', '/object/1.1', id=1.1) self.assertMatches('/object/<id:float>', '/object/.1', id=0.1) self.assertMatches('/object/<id:float>', '/object/1.', id=1) self.assertRaises(bottle.HTTPError, self.match, '/object/abc') self.assertRaises(bottle.HTTPError, self.match, '/object/') self.assertRaises(bottle.HTTPError, self.match, '/object/.')
def test_401(self): """ WSGI: abort(401, '') (HTTP 401) """ @bottle.route('/') def test(): bottle.abort(401) self.assertStatus(401, '/') @bottle.error(401) def err(e): bottle.response.status = 200 return str(type(e)) self.assertStatus(200, '/') self.assertBody("<class 'bottle.HTTPError'>",'/')
def test_view_error(self): """ WSGI: Test if view-decorator reacts on non-dict return values correctly.""" @bottle.route('/tpl') @bottle.view('stpl_t2main') def test(): return bottle.HTTPError(401, 'The cake is a lie!') self.assertInBody('The cake is a lie!', '/tpl') self.assertInBody('401 Unauthorized', '/tpl') self.assertStatus(401, '/tpl')
def dispatch(self): raise HTTPError('418 Teapot', [1,2,3])
def tasks_create_file(): response = {} data = request.files.file package = request.forms.get("package", "") timeout = request.forms.get("timeout", "") priority = request.forms.get("priority", 1) options = request.forms.get("options", "") machine = request.forms.get("machine", "") platform = request.forms.get("platform", "") tags = request.forms.get("tags", None) custom = request.forms.get("custom", "") memory = request.forms.get("memory", 'False') clock = request.forms.get("clock", None) shrike_url = request.forms.get("shrike_url", None) shrike_msg = request.forms.get("shrike_msg", None) shrike_sid = request.forms.get("shrike_sid", None) shrike_refer = request.forms.get("shrike_refer", None) if memory.upper() == 'FALSE' or memory == '0': memory = False else: memory = True enforce_timeout = request.forms.get("enforce_timeout", 'False') if enforce_timeout.upper() == 'FALSE' or enforce_timeout == '0': enforce_timeout = False else: enforce_timeout = True temp_file_path = store_temp_file(data.file.read(), data.filename) try: task_ids = db.demux_sample_and_add_to_db(file_path=temp_file_path, package=package, timeout=timeout, options=options, priority=priority, machine=machine, platform=platform, custom=custom, memory=memory, enforce_timeout=enforce_timeout, tags=tags, clock=clock, shrike_url=shrike_url, shrike_msg=shrike_msg, shrike_sid=shrike_sid, shrike_refer=shrike_refer) except CuckooDemuxError as e: return HTTPError(500, e) response["task_ids"] = task_ids return jsonize(response)
def tasks_reschedule(task_id): response = {} if not db.view_task(task_id): return HTTPError(404, "There is no analysis with the specified ID") if db.reschedule(task_id): response["status"] = "OK" else: return HTTPError(500, "An error occurred while trying to " "reschedule the task") return jsonize(response)
def tasks_delete(task_id): response = {} task = db.view_task(task_id) if task: if task.status == TASK_RUNNING: return HTTPError(500, "The task is currently being " "processed, cannot delete") if db.delete_task(task_id): delete_folder(os.path.join(CUCKOO_ROOT, "storage", "analyses", "%d" % task_id)) if FULL_DB: task = results_db.analysis.find_one({"info.id": task_id}) for processes in task.get("behavior", {}).get("processes", []): [results_db.calls.remove(call) for call in processes.get("calls", [])] results_db.analysis.remove({"info.id": task_id}) response["status"] = "OK" else: return HTTPError(500, "An error occurred while trying to " "delete the task") else: return HTTPError(404, "Task not found") return jsonize(response)
def files_get(sha256): file_path = os.path.join(CUCKOO_ROOT, "storage", "binaries", sha256) if os.path.exists(file_path): response.content_type = "application/octet-stream; charset=UTF-8" return open(file_path, "rb").read() else: return HTTPError(404, "File not found")
def pcap_get(task_id): file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%d" % task_id, "dump.pcap") if os.path.exists(file_path): response.content_type = "application/octet-stream; charset=UTF-8" try: return open(file_path, "rb").read() except: return HTTPError(500, "An error occurred while reading PCAP") else: return HTTPError(404, "File not found")
def machines_view(name=None): response = {} machine = db.view_machine(name=name) if machine: response["machine"] = machine.to_dict() else: return HTTPError(404, "Machine not found") return jsonize(response)