我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用bottle.request.query()。
def _request(request, request_fallback=None): ''' Extract request fields wherever they may come from: GET, POST, forms, fallback ''' # Use lambdas to avoid evaluating bottle.request.* which may throw an Error all_dicts = [ lambda: request.json, lambda: request.forms, lambda: request.query, lambda: request.files, #lambda: request.POST, lambda: request_fallback ] request_dict = dict() for req_dict_ in all_dicts: try: req_dict = req_dict_() except KeyError: continue if req_dict is not None and hasattr(req_dict, 'items'): for req_key, req_val in req_dict.items(): request_dict[req_key] = req_val return request_dict
def dtos_get_films_with_actors(): try: queryObject = QueryObject( filter = "ReleaseYear='{0}'".format(request.query['releaseYear']), expand = ['FilmActors.Actor', 'FilmCategories'] ) resultSerialData = dataService.dataViewDto.getItems("Film", queryObject) return json.dumps(resultSerialData, cls=CustomEncoder, indent=2) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: POST: api/datasource/crud/operations/dtos/TestAction # with: Content-Type: application/json and body - {"param1":1}
def entities_get_films_with_actors(): try: queryObject = QueryObject( filter = "ReleaseYear='{0}'".format(request.query['releaseYear']), expand = ['FilmActors.Actor', 'FilmCategories'] ) resultSerialData = dataService.from_.remote.dtoView.Films.getItems(queryObject) return json.dumps(resultSerialData, cls=CustomEncoder, indent=2) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: api/datasource/crud/operations/entities/TestAction # with: Content-Type: application/json and body - {"param1":1}
def _handle_cert(self, domain): rawcert = request.environ['ssl_certificate'] certconfig = self._get_cert_config_if_allowed(domain, rawcert) logger.debug('Fetching certificate for domain %s', domain) (key, crt, chain) = self.acmeproxy.get_cert( domain=domain, altname=certconfig.altname, rekey=certconfig.rekey, renew_margin=certconfig.renew_margin, force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'), # pylint: disable=unsupported-membership-test,unsubscriptable-object auto_renew=certconfig.renew_on_fetch ) return { 'crt': crt.decode(), 'key': key.decode(), 'chain': chain.decode() }
def assertRedirect(self, target, result, query=None, status=303, **args): env = {'SERVER_PROTOCOL':'HTTP/1.1'} for key in list(args): if key.startswith('wsgi'): args[key.replace('_', '.', 1)] = args[key] del args[key] env.update(args) request.bind(env) bottle.response.bind() try: bottle.redirect(target, **(query or {})) except bottle.HTTPResponse: r = _e() self.assertEqual(status, r.status_code) self.assertTrue(r.headers) self.assertEqual(result, r.headers['Location'])
def _filter_row(row, filters): def filterfunc(row, filt): val = filters[filt]['val'] if filt == "tracks": tracks_set = set(val.split('|')) return row['trackrel'] in tracks_set elif filt.endswith(" (min)"): filt = filt.replace(" (min)", "") val = float(val) return row[filt] >= val elif filt.endswith(" (max)"): filt = filt.replace(" (max)", "") val = float(val) return row[filt] <= val elif isinstance(row[filt], bool): # need to convert our string query values to bool for the comparison return row[filt] == bool(val) else: return row[filt] == val return all(filterfunc(row, filt) for filt in filters)
def download(): trackrels = request.query.tracks.split('|') # write the archive into a temporary in-memory file-like object temp = tempfile.SpooledTemporaryFile() with zipfile.ZipFile(temp, 'w', zipfile.ZIP_DEFLATED) as archive: for trackrel in trackrels: base_wildcard = trackrel.replace("-track.csv", "*") paths = config.TRACKDIR.glob(base_wildcard) for path in paths: archive.write(str(path), str(path.relative_to(config.TRACKDIR)) ) temp.seek(0) # force a download; give it a filename and mime type response.set_header('Content-Disposition', 'attachment; filename="data.zip"') response.set_header('Content-Type', 'application/zip') # relying on garbage collector to delete tempfile object # (and hence the file itself) when done sending return temp
def create(controller="", method=""): return exec_command(controller=controller, method=method, input=request.query)
def _extract_params(request_dict, param_list, param_fallback=False): ''' Extract pddb parameters from request ''' if not param_list or not request_dict: return dict() query = dict() for param in param_list: # Retrieve all items in the form of {param: value} and # convert {param__key: value} into {param: {key: value}} for query_key, query_value in request_dict.items(): if param == query_key: query[param] = query_value else: query_key_parts = query_key.split('__', 1) if param == query_key_parts[0]: query[param] = {query_key_parts[1]: query_value} # Convert special string "__null__" into Python None nullifier = lambda d: {k:(nullifier(v) if isinstance(v, dict) else # pylint: disable=used-before-assignment (None if v == '__null__' else v)) for k, v in d.items()} # When fallback is enabled and no parameter matched, assume query refers to first parameter if param_fallback and all([param_key not in query.keys() for param_key in param_list]): query = {param_list[0]: dict(request_dict)} # Return a dictionary with only the requested parameters return {k:v for k, v in nullifier(query).items() if k in param_list}
def get_search(): try: q = request.query["q"] except KeyError: return [] else: results = graph.run( "MATCH (movie:Movie) " "WHERE movie.title =~ {title} " "RETURN movie", {"title": "(?i).*" + q + ".*"}) response.content_type = "application/json" return json.dumps([{"movie": dict(row["movie"])} for row in results])
def get_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGet(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # GET: api/datasource/crud/single/:entitySetName?keys=key1:{key1}
def get_single_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGetSingle(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # GET: api/datasource/crud/many/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def get_many_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleGetMany(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder, indent=2) except: abort(400, 'Bad Request') # PUT: api/datasource/crud/:entitySetName?keys=key1:{key1}
def put_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1} #@patch('/api/datasource/crud/<entitySetName>')
def delete_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleDeleteEntity(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PUT: api/datasource/crud/batch/:entitySetName
def delete_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request')
def _handle_renew_all(self): rawcert = request.environ['ssl_certificate'] self._assert_admin(rawcert) result = { 'ok': [], 'error': [] } for cert in self.acmeproxy.list_certificates(): domain = cert['cn'] certconfig = self.certificates_config.match(domain) if certconfig: logger.debug('Getting certificate for domain %s', domain) try: self.acmeproxy.get_cert( domain=domain, altname=certconfig.altname, rekey=certconfig.rekey, renew_margin=certconfig.renew_margin, force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'), # pylint: disable=unsupported-membership-test,unsubscriptable-object ) result['ok'].append(domain) except Exception as e: logger.error('Encountered exception while getting certificate for domain %s (%s)', domain, e) result['error'].append(domain) else: logger.error('No configuration found for domain %s', domain) result['error'].append(domain) return result
def info(account_id, resource_id): request_data = request.query if resource_id.startswith('sg-') and 'parent_id' not in request_data: abort(400, "Missing required parameter parent_id") result = controller.info( account_id, resource_id, request_data.get('parent_id', resource_id)) response.content_type = "application/json" return json.dumps(result, indent=2, cls=Encoder) # this set to post to restrict permissions, perhaps another url space.
def test_get(self): """ Environ: GET data """ qs = tonat(tob('a=a&a=1&b=b&c=c&cn=%e7%93%b6'), 'latin1') request = BaseRequest({'QUERY_STRING':qs}) self.assertTrue('a' in request.query) self.assertTrue('b' in request.query) self.assertEqual(['a','1'], request.query.getall('a')) self.assertEqual(['b'], request.query.getall('b')) self.assertEqual('1', request.query['a']) self.assertEqual('b', request.query['b']) self.assertEqual(tonat(tob('?'), 'latin1'), request.query['cn']) self.assertEqual(touni('?'), request.query.cn)
def tracks(db): selected_filters = {} for filt in request.query.keys(): val = request.query.get(filt) if val == '': # unspecified min/max numeric values, e.g. continue # figure out query string without this selection (for backing out in web interface) querystring_without = '&'.join("{}={}".format(key, val) for key, val in request.query.items() if key != filt) # store for showing in page selected_filters[filt] = { 'val': val, 'querystring_without': querystring_without } track_data = _get_track_data(db, selected_filters) # possible filter params/values for selected rows possible_filters = _get_filters(track_data, selected_filters) if track_data else [] return template('tracks', tracks=track_data, filters=possible_filters, selected=selected_filters, query_string=request.query_string, query=request.query )
def jsonp(view): def wrapped(*posargs, **kwargs): args = {} # if we access the args via get(), # we can get encoding errors... for k in request.forms: args[k] = getattr(request.forms, k) for k in request.query: args[k] = getattr(request.query, k) callback = args.get('callback') status_code = 200 try: result = view(args, *posargs, **kwargs) except (KeyError) as e:#ValueError, AttributeError, KeyError) as e: import traceback, sys traceback.print_exc(file=sys.stdout) result = {'status':'error', 'message':'invalid query', 'details': str(e)} status_code = 403 if callback: result = '%s(%s);' % (callback, json.dumps(result)) if status_code == 200: return result else: abort(status_code, result) return wrapped
def run_profiling(self, num_loops, num_neighbors, age_proximity): """Executes the k_nearest_neighbors algorithm for num_loops times and returns the average running time Args: num_loops: number of loops for which we query the server num_neighbors: number of neighbors to query for age_proximity: maximum difference between a candidate neighbor's age and the user Returns: """ print('profiling over ', num_loops, ' times') random_latitudes = random.uniform(-90, 90, num_loops) random_longitudes = random.uniform(-180, 180, num_loops) time_list = [] for i in tqdm(range(len(random_latitudes))): start_time = time.clock() kd_store.k_nearest_neighbors({'name': 'bla bla', 'age': 23, 'latitude': random_latitudes[i] / 2, 'longitude': random_longitudes[i]}, num_neighbors, age_proximity) end_time = time.clock() time_list.append(end_time - start_time) # get the timing statistics stats_desc = stats.describe(time_list) frac_times_exceeded = len(np.where(np.array(time_list) >= 1)[0]) / len(time_list) print('\nfraction of times with delay > 1 is: ', frac_times_exceeded, '\n') print('\nStats:\n', stats_desc) return stats_desc
def login_xenforo(user=None): if user: return redirect('/') code = request.query['code'] data = {'grant_type': 'authorization_code', 'code': code, 'client_id': config['XF_CLIENT_ID'], 'client_secret': config['XF_SECRET_KEY'], 'redirect_uri': '%s/login/xenforo' % config['SITE_URL']} res = requests.post('%s/api/index.php?oauth/token' % config['XF_URL'], data = data) res_data = res.json() res.raise_for_status() pprint(res_data) user = { 'identity': res_data['user_id'], 'identity_type': 'xenforo', 'access_token': res_data['access_token'], 'refresh_token': res_data['refresh_token'], 'user_id': res_data['user_id'], 'authlink': str(uuid.uuid4()), # this is still used to unsubscribe from emails 'subscribed': True, } # get extended information res = requests.get('%s/api/?users/%s&oauth_token=%s' % (config['XF_URL'], res_data['user_id'], user['access_token'])) xf_user = res.json()['user'] user['xf_username'] = xf_user['username'] user['email'] = xf_user['user_email'] r.table('users').get(user['identity']).replace(user).run(conn()) return users.login(user)
def test_list_environments(self): parsed = json.loads(api.environments_list(self.session)) for env in parsed['environments']: self.assertTrue(env['deploy_authorized']) request.account = self.session.query(m.User).filter_by(username="username").one() parsed = json.loads(api.environments_list(self.session)) for env in parsed['environments']: if env['id'] == 1: self.assertTrue(env['deploy_authorized']) else: self.assertFalse(env['deploy_authorized'])
def test_start_deployment_non_admin(self, mocked): request.account = self.session.query(m.User).filter(m.User.username == 'username').one() self._set_body_json({ 'target': { 'cluster': None, 'server': None }, 'branch': 'master', 'commit': 'abcde' }) api.environments_start_deployment(1, self.session)
def test_start_deployment_with_impersonating(self, mocked): request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one() request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username' self._set_body_json({ 'target': { 'cluster': None, 'server': None }, 'branch': 'master', 'commit': 'abcde' }) api.environments_start_deployment(1, self.session)
def test_start_deployment_with_impersonating_unprivilegied_user(self, mocked): request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one() request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username' self._set_body_json({ 'target': { 'cluster': None, 'server': None }, 'branch': 'master', 'commit': 'abcde' }) with self.assertRaises(HTTPError) as cm: api.environments_start_deployment(2, self.session) self.assertEquals(403, cm.exception.code)
def test_expunge_default_user(self): user = self.session.query(m.User).filter_by(username="default").one() api._expunge_user(user, self.session) self.session.close() # Make sure we can access attributes after the session is closed self.assertEqual(1, len(user.roles)) self.assertEqual(0, len(user.default_roles))
def test_diff_requires_login(self): # The default user should not be able to read diff user = self.session.query(m.User).filter_by(username="default").one() request.account = user request.query['from'] = 'ab' request.query['to'] = 'de' with self.assertRaises(HTTPError) as e: api.repositories_diff(1, self.session) self.assertEqual(403, e.exception.status_code)
def test_list_deployments_requires_read(self): user = self.session.query(m.User).filter_by(username="default").one() request.account = user deploys = json.loads(api.deployments_list("my repo", self.session))['deployments'] self.assertEqual(0, len(deploys)) user = self.session.query(m.User).filter_by(username="root").one() request.account = user deploys = json.loads(api.deployments_list("my repo", self.session))['deployments'] self.assertGreater(len(deploys), 0)