我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bottle.request()。
def login_required(perm=None): def _dec(func): def _view(*args, **kwargs): s = request.environ.get('beaker.session') if s.get("name", None) and s.get("authenticated", False): if perm: perms = parse_permissions(s) if perm not in perms or not perms[perm]: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return HTTPError(403, "Forbidden") else: return redirect("/nopermission") return func(*args, **kwargs) else: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return HTTPError(403, "Forbidden") else: return redirect("/login") return _view return _dec
def translate(self): """ Processes a translation request. """ translation_request = request_provider(self._style, request) logging.debug("REQUEST - " + repr(translation_request)) translations = self._translator.translate( translation_request.segments, translation_request.settings ) response_data = { 'status': TranslationResponse.STATUS_OK, 'segments': [translation.target_words for translation in translations], 'word_alignments': [translation.get_alignment_json(as_string=False) for translation in translations] if translation_request.settings.get_alignment else None, 'word_probabilities': [translation.target_probs for translation in translations] if translation_request.settings.get_word_probs else None, } translation_response = response_provider(self._style, **response_data) logging.debug("RESPONSE - " + repr(translation_response)) response.content_type = translation_response.get_content_type() return repr(translation_response)
def code_verify(request): image_str = request.forms.get("code") typ = int(request.forms.get('type')) filename = os.path.join( curpath, 'temp', '%s.png'%( md5.md5( str(time.time()) + str(random.random()) ).hexdigest() ) ) with open(filename, 'wb') as f: f.write(image_str) img = Image.open(filename) exec '''from main import main_%s'''%(typ) exec '''code = main_%s.verify(img, code_template_dic[typ])'''%(typ) #exec '''code = main_%s.verify(img)'''%(typ) if hasattr(img, 'close'): img.close() try: print os.remove(filename) except Exception as e: print u"?????? %s"%e return code
def prometheus_metrics(): if not test_restricted(request['REMOTE_ADDR']): return '' dhcpstat = {'shared-networks': []} dhcp6stat = {'shared-networks': []} try: dhcpstat = json.loads(exec_command([args.binary, '-c', args.dhcp4_config, '-l', args.dhcp4_leases, '-f', 'j'])) except: pass try: dhcp6stat = json.loads(exec_command([args.binary, '-c', args.dhcp6_config, '-l', args.dhcp6_leases, '-f', 'j'])) except: pass data = [] for shared_network in dhcpstat['shared-networks']: data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['used'])) data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['free'])) defined_leases = float(shared_network['defined']) leases_used_percentage = 0 if defined_leases > 0: leases_used_percentage = float(shared_network['used'])/defined_leases data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],leases_used_percentage)) for shared_network in dhcp6stat['shared-networks']: data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['used'])) data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['free'])) defined_leases = float(shared_network['defined']) leases_used_percentage = 0 if defined_leases > 0: leases_used_percentage = float(shared_network['used'])/defined_leases data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],leases_used_percentage)) response.content_type = 'text/plain' return '%s\n' % ('\n'.join(data))
def auth_server(): username = request.forms.get('username') if username == 'user_ok': return {'access_token': 'my-nifty-access-token'} elif username == 'auth_fail': response.status = 400 return {'error': 'errored with HTTP 400 on request'} elif username == 'create_fail': return {'access_token': 'the-token-with-which-create-will-fail'} elif username == 'delete_fail': return {'access_token': 'the-token-with-which-delete-will-fail'} elif username == 'empty_page': return {'access_token': 'the-token-which-causes-an-empty-page'} else: return {}
def create(): auth_token = request.headers.get('Authorization').split()[1] if auth_token == 'my-nifty-access-token': return {'response': 'Successful creation of user.'} elif auth_token == 'the-token-with-which-create-will-fail': response.status = 403 return {'error': 'Permission denied'} elif auth_token == 'the-token-which-causes-an-empty-page': response.status = 403 return "empty"
def delete(): auth_token = request.headers.get('Authorization').split()[1] if auth_token == 'my-nifty-access-token': return {'response': 'Successful deletion of user.'} elif auth_token == 'the-token-with-which-delete-will-fail': response.status = 403 return {'error': 'Permission denied'}
def load_client(context): """Get an instance of a loaded client.""" username = context.getTransformSetting('username') api_key = context.getTransformSetting('aKey') test_status = context.getTransformSetting('test_local') if test_status and test_status == 'True': server = context.getTransformSetting('server') version = context.getTransformSetting('version') return AttributeRequest(username, api_key, server, version) else: return AttributeRequest(username, api_key, headers=gen_debug(request))
def load_client(context): """Get an instance of a loaded client.""" username = context.getTransformSetting('username') api_key = context.getTransformSetting('aKey') test_status = context.getTransformSetting('test_local') if test_status and test_status == 'True': server = context.getTransformSetting('server') version = context.getTransformSetting('version') return DnsRequest(username, api_key, server, version) else: return DnsRequest(username, api_key, headers=gen_debug(request))
def load_enrichment(context): """Get an instance of a loaded client.""" username = context.getTransformSetting('username') api_key = context.getTransformSetting('aKey') test_status = context.getTransformSetting('test_local') if test_status and test_status == 'True': server = context.getTransformSetting('server') version = context.getTransformSetting('version') return EnrichmentRequest(username, api_key, server, version, debug=True) else: return EnrichmentRequest(username, api_key, headers=gen_debug(request))
def load_client(context): """Get an instance of a loaded client.""" username = context.getTransformSetting('username') api_key = context.getTransformSetting('aKey') test_status = context.getTransformSetting('test_local') if test_status and test_status == 'True': server = context.getTransformSetting('server') version = context.getTransformSetting('version') return SslRequest(username, api_key, server, version) else: return SslRequest(username, api_key, headers=gen_debug(request))
def load_client(context): """Get an instance of a loaded client.""" username = context.getTransformSetting('username') api_key = context.getTransformSetting('aKey') test_status = context.getTransformSetting('test_local') if test_status and test_status == 'True': server = context.getTransformSetting('server') version = context.getTransformSetting('version') return ActionsClient(username, api_key, server, version) else: return ActionsClient(username, api_key, headers=gen_debug(request))
def load_client(context): """Get an instance of a loaded client.""" username = context.getTransformSetting('username') api_key = context.getTransformSetting('aKey') test_status = context.getTransformSetting('test_local') if test_status and test_status == 'True': server = context.getTransformSetting('server') version = context.getTransformSetting('version') return EnrichmentRequest(username, api_key, server, version) else: return EnrichmentRequest(username, api_key, headers=gen_debug(request))
def add_organization_html(ending): """Submit an organization for scraping. This shows an html page with a link to the status of the organization. """ organization = request.forms.get('organization') scraper.scrape_organization(organization) if ending == "json": return {"status": "ok", "urls": api.get_organization_urls(organization)} return template("added-organization.html", organization=organization)
def get_all_organizations(ending): """List all organizations with links to their statuses.""" start = int(request.query.get("offset", 0)) count = min(int(request.query.get("limit", DEFAULT_PAGINATION_COUNT)), MAX_PAGINATION_COUNT) if ending == "json": return api.get_organizations(start, count) return template("organizations.html", start=start, count=count)
def add_repository(ending): """Sumbit a repository for scraping This shows an html page with a link to the status of the repository. """ repository = request.forms.get('repository') organization_name, repository_name = repository.split("/") scraper.scrape_repository(repository) if ending == "json": return {"status": "ok", "urls": api.get_repository_urls(organization_name, repository_name)} return template("added-repository.html", repository=repository)
def add_authentication(): """Add `username` and `password` to those usable to scrape github. They will be tried and removed if invalid. """ # http://bottlepy.org/docs/dev/tutorial.html#http-request-methods username = request.forms.get('username') password = request.forms.get('password') if check_login((username, password)): credentials.add((username, password)) return static("add-github-authentication-success.html") return static("add-github-authentication-failure.html")
def __enter__(self): self.orig = bottle.request.environ bottle.request.environ = self.environ for k,v in self.extras.items(): if hasattr(bottle.request, k): self.extra_orig[k] = getattr(bottle.request, k) setattr(bottle.request, k, v) setattr(bottle.BaseRequest, 'app', True)
def __exit__(self,a,b,c): bottle.request.environ = self.orig for k,v in self.extras.items(): if k in self.extra_orig: setattr(bottle.request, k, self.extra_orig[k]) else: try: delattr(bottle.request, k) except AttributeError: pass setattr(bottle.BaseRequest, 'app', self.orig_app_reader)
def testParams(self): with boddle(params={'name':'derek'}): self.assertEqual(bottle.request.params['name'], 'derek')
def test_no_params_no_throw(self): with boddle(): self.assertEqual(list(bottle.request.params.items()), [])
def testGetParams(self): with boddle(method='get', params={'name':'derek'}): self.assertEqual(bottle.request.params['name'], 'derek')
def testPath(self): with boddle(path='/derek'): self.assertEqual(bottle.request.path, '/derek') with boddle(path='/anderson'): self.assertEqual(bottle.request.path, '/anderson') self.assertEqual(bottle.request.path, '/derek')
def testMethod(self): with boddle(method='post'): self.assertEqual(bottle.request.method, 'POST')
def testHeaders(self): with boddle(headers={'x_header':'value'}): self.assertEqual(bottle.request.headers['X_HEADER'], 'value')
def testHyphenatedHeaders(self): with boddle(headers={'x-header':'value'}): self.assertEqual(bottle.request.headers['X-HEADER'], 'value')
def testExtraStuff(self): with boddle(extra='woot'): self.assertEqual(bottle.request.extra, 'woot') with boddle(extra='woot2'): self.assertEqual(bottle.request.extra, 'woot2') self.assertFalse(hasattr(bottle.request,'extra'))
def testURL(self): with boddle(url='https://github.com/keredson/boddle'): self.assertEqual(bottle.request.url, 'https://github.com/keredson/boddle') self.assertEqual(bottle.request.fullpath, '/keredson/boddle') self.assertEqual(bottle.request.path, '/keredson/boddle')
def testRevert(self): with boddle(params={'name':'derek'}): self.assertEqual(bottle.request.params['name'], 'derek') with boddle(params={'name':'anderson'}): self.assertEqual(bottle.request.params['name'], 'anderson') self.assertEqual(bottle.request.params['name'], 'derek')
def testBody(self): with boddle(body='body'): self.assertEqual(bottle.request.body.read(), b'body') self.assertEqual(bottle.request.body.readline(), b'body')
def set_session(request, info): s = request.environ.get('beaker.session') s["authenticated"] = True s["user_id"] = info["id"] s["name"] = info["name"] s["role"] = info["role"] s["perms"] = info["permission"] s["template"] = info["template"] s.save() return s
def get_preview_image(): context = get_label_context(request) im = create_label_im(**context) return_format = request.query.get('return_format', 'png') if return_format == 'base64': import base64 response.set_header('Content-type', 'text/plain') return base64.b64encode(image_to_png_bytes(im)) else: response.set_header('Content-type', 'image/png') return image_to_png_bytes(im)
def web_index(token=None): """ provides an index of buttons This function is called from far far away, over the Internet """ logging.info('Serving index page') try: if 'key' not in settings['server']: pass elif decode_token(settings, token) != 'index': raise ValueError('Invalid label in token') except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to serve the index page") raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request' items = [] global buttons for button in buttons: items.append({ 'label': button, 'delete-url': '/delete/'+settings['tokens'].get(button+'-delete'), 'initialise-url': '/initialise/'+settings['tokens'].get(button+'-initialise'), 'push-url': '/'+settings['tokens'].get(button), }) logging.debug('Buttons: {}'.format(items)) return template('views/list_items', prefix=settings['server']['url'], items=items) # # invoked from bt.tn #
def web_press(button=None): """ Processes the press of a bt.tn device This function is called from far far away, over the Internet """ if button is None: button = settings['server']['default'] try: button = decode_token(settings, button) context = load_button(settings, button) return handle_button(context) except socket.error as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to push '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 500 return 'Internal error' except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to push '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request'
def web_initialise(button=None): """ Initialises a room This function is called from far far away, over the Internet """ if button is None: button = settings['server']['default'] logging.info("Initialising button '{}'".format(button)) try: button = decode_token(settings, button, action='initialise') context = load_button(settings, button) delete_room(context) global buttons buttons.pop(button, None) context = load_button(settings, button) context['spark']['id'] = get_room(context) return 'OK' except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to initialise '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request'
def web_delete(button=None): """ Deletes a room This function is called from far far away, over the Internet """ if button is None: button = settings['server']['default'] logging.info("Deleting button '{}'".format(button)) try: button = decode_token(settings, button, action='delete') context = load_button(settings, button) delete_room(context) global buttons buttons.pop(button, None) return 'OK' except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to delete '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request'
def _handle_get(self, request_data): """ An OCSP GET request contains the DER-in-base64 encoded OCSP request in the HTTP request URL. """ der = base64.b64decode(request_data) ocsp_request = self._parse_ocsp_request(der) return self._build_http_response(ocsp_request)
def _handle_post(self): """ An OCSP POST request contains the DER encoded OCSP request in the HTTP request body. """ der = request.body.read() ocsp_request = self._parse_ocsp_request(der) return self._build_http_response(ocsp_request)
def _parse_ocsp_request(self, request_der: bytes) -> OCSPRequest: """ Parse the request bytes, return an ``OCSPRequest`` instance. """ return OCSPRequest.load(request_der)
def _device_set(self): response.headers['Content-Type'] = 'application/json' retval = { } def verify_and_set(client): try: data = json.loads(request.body.read().decode('ascii')) except: raise ValueError('Unable to decode json data from PUT request') if data is None: raise ValueError('No data set in body of PUT request') if not 'name' in data: raise KeyError('No "name" field in body of PUT request') if not 'value' in data: raise KeyError('No "value" field in body of PUT request') client.set_device_value(data['name'], data['value']) if self._debug: verify_and_set(self) else: try: verify_and_set(self) except Exception as e: retval['error'] = str(e) return json.dumps(retval)
def tasks_create_file(): response = {} data = request.files.file package = request.forms.get("package", "") timeout = request.forms.get("timeout", "") priority = request.forms.get("priority", 1) options = request.forms.get("options", "") machine = request.forms.get("machine", "") platform = request.forms.get("platform", "") tags = request.forms.get("tags", None) custom = request.forms.get("custom", "") memory = request.forms.get("memory", 'False') clock = request.forms.get("clock", None) shrike_url = request.forms.get("shrike_url", None) shrike_msg = request.forms.get("shrike_msg", None) shrike_sid = request.forms.get("shrike_sid", None) shrike_refer = request.forms.get("shrike_refer", None) if memory.upper() == 'FALSE' or memory == '0': memory = False else: memory = True enforce_timeout = request.forms.get("enforce_timeout", 'False') if enforce_timeout.upper() == 'FALSE' or enforce_timeout == '0': enforce_timeout = False else: enforce_timeout = True temp_file_path = store_temp_file(data.file.read(), data.filename) try: task_ids = db.demux_sample_and_add_to_db(file_path=temp_file_path, package=package, timeout=timeout, options=options, priority=priority, machine=machine, platform=platform, custom=custom, memory=memory, enforce_timeout=enforce_timeout, tags=tags, clock=clock, shrike_url=shrike_url, shrike_msg=shrike_msg, shrike_sid=shrike_sid, shrike_refer=shrike_refer) except CuckooDemuxError as e: return HTTPError(500, e) response["task_ids"] = task_ids return jsonize(response)
def tasks_list(limit=None, offset=None): response = {} response["tasks"] = [] completed_after = request.GET.get("completed_after") if completed_after: completed_after = datetime.fromtimestamp(int(completed_after)) status = request.GET.get("status") # optimisation required for dist speedup ids = request.GET.get("ids") for row in db.list_tasks(limit=limit, details=True, offset=offset, completed_after=completed_after, status=status, order_by=Task.completed_on.asc()): task = row.to_dict() if ids: task = {"id":task["id"], "completed_on":task["completed_on"]} else: task["guest"] = {} if row.guest: task["guest"] = row.guest.to_dict() task["errors"] = [] for error in row.errors: task["errors"].append(error.message) task["sample"] = {} if row.sample_id: sample = db.view_sample(row.sample_id) task["sample"] = sample.to_dict() response["tasks"].append(task) return jsonize(response)
def enable_cors(fn): def _enable_cors(*args, **kwargs): # set CORS headers response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS' response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' if bottle.request.method != 'OPTIONS': # actual request; reply with the actual response return fn(*args, **kwargs) return _enable_cors
def autocomplete(): #logging.info(request) try: q = request.query.get("q") return cns.es_autocomplete(q) except: traceback.print_exc() logging.info("error") return ""
def search(): try: q = request.query.get("q") offset = request.query.get("offset", 0) return cns.es_search(q, offset) except: traceback.print_exc() logging.info("error") return ""
def enable_cors(self): '''Enables Cross Origin Resource Sharing. This makes sure the necessary headers are set so that this web application's routes can be accessed from other origins. :rtype: :class:`WebBuilder` ''' def access_control_headers(): bottle.response.headers['Access-Control-Allow-Origin'] = '*' bottle.response.headers['Access-Control-Allow-Methods'] = \ 'GET, POST, PUT, DELETE, OPTIONS' bottle.response.headers['Access-Control-Allow-Headers'] = \ 'Origin, X-Requested-With, Content-Type, Accept, Authorization' def options_response(res): if bottle.request.method == 'OPTIONS': new_res = bottle.HTTPResponse() new_res.headers['Access-Control-Allow-Origin'] = '*' new_res.headers['Access-Control-Allow-Methods'] = \ bottle.request.headers.get( 'Access-Control-Request-Method', '') new_res.headers['Access-Control-Allow-Headers'] = \ bottle.request.headers.get( 'Access-Control-Request-Headers', '') return new_res res.headers['Allow'] += ', OPTIONS' return bottle.request.app.default_error_handler(res) self.app.add_hook('after_request', access_control_headers) self.app.error_handler[int(405)] = options_response return self
def invalidate(): "Invalidates all results; sends request for updates. Recommended once per year." pass
def code_verify_out(): try: code = code_verify(request) except Exception as e: print traceback.format_exc() code = 'ERROR' return code
def create_user_with_key(): """ Create a user directory with a keyfile on the shared volume, data arriving in the payload of the request with a JSON payload. """ username = username_from_request(request) if not username: response.status = 422 return {'error': "Parameter 'username' not specified"} elif not username_valid(username): response.status = 400 return {'error': "Invalid parameter 'username': '{0}' not allowed.". format(username) } pubkey = request.json.get('pubkey') if not pubkey: response.status = 422 return {'error': "Parameter 'pubkey' not specified"} abs_home_path = normpath(os.path.join(HOME_PATH_PREFIX, username)) username_was_added = check_and_add(username) # Do the actual creation store_pubkey(username, abs_home_path, pubkey) response.status = 201 return {'response': 'Successful creation of user {0} and/or upload of key.' .format(username)}
def insert_countdown(): """Inserts a countdown into the database.""" return json_api.insert_countdown(request) # Run this only if we are not in production