我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用bottle.request.environ()。
def logten(): ip = request['REMOTE_ADDR'] day= datetime.datetime.now().strftime("_%Y_%m_%d_%H") url = request.environ['PATH_INFO']+day if 'url' not in cache: cache['url'] = {} if ip+url not in cache['url']: cache['url'][ip+url]=0 if len(cache['url'])>200: cache['url']={} logit('clear cache url') if 'Mozilla/4.0' in request.environ.get('HTTP_USER_AGENT','no agent'): pass else: logit('''url @ %s [ <a href="http://www.baidu.com/s?wd=%s&_=%.0f" target="_blank">%s</a> ] %.1f <span style="color:gray">%s</span>'''%(url,ip,time.time()/10,ip,cache['pass']-time.time(),request.environ.get('HTTP_USER_AGENT','no agent'))) return True
def _handle_cert(self, domain): rawcert = request.environ['ssl_certificate'] certconfig = self._get_cert_config_if_allowed(domain, rawcert) logger.debug('Fetching certificate for domain %s', domain) (key, crt, chain) = self.acmeproxy.get_cert( domain=domain, altname=certconfig.altname, rekey=certconfig.rekey, renew_margin=certconfig.renew_margin, force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'), # pylint: disable=unsupported-membership-test,unsubscriptable-object auto_renew=certconfig.renew_on_fetch ) return { 'crt': crt.decode(), 'key': key.decode(), 'chain': chain.decode() }
def local_check(function): def _view(*args, **kwargs): if request.environ.get('REMOTE_ADDR', "0") in ('127.0.0.1', 'localhost') \ or request.environ.get('HTTP_HOST','0') == '127.0.0.1:9666': return function(*args, **kwargs) else: return HTTPError(403, "Forbidden") return _view
def flashgot(): if request.environ['HTTP_REFERER'] != "http://localhost:9666/flashgot" and request.environ['HTTP_REFERER'] != "http://127.0.0.1:9666/flashgot": return HTTPError() autostart = int(request.forms.get('autostart', 0)) package = request.forms.get('package', None) urls = filter(lambda x: x != "", request.forms['urls'].split("\n")) folder = request.forms.get('dir', None) if package: PYLOAD.addPackage(package, urls, autostart) else: PYLOAD.generateAndAddPackages(urls, autostart) return ""
def _handle_list_certs(self): rawcert = request.environ['ssl_certificate'] self._assert_admin(rawcert) certs = self.acmeproxy.list_certificates() return { 'certificates': certs }
def _handle_renew_all(self): rawcert = request.environ['ssl_certificate'] self._assert_admin(rawcert) result = { 'ok': [], 'error': [] } for cert in self.acmeproxy.list_certificates(): domain = cert['cn'] certconfig = self.certificates_config.match(domain) if certconfig: logger.debug('Getting certificate for domain %s', domain) try: self.acmeproxy.get_cert( domain=domain, altname=certconfig.altname, rekey=certconfig.rekey, renew_margin=certconfig.renew_margin, force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'), # pylint: disable=unsupported-membership-test,unsubscriptable-object ) result['ok'].append(domain) except Exception as e: logger.error('Encountered exception while getting certificate for domain %s (%s)', domain, e) result['error'].append(domain) else: logger.error('No configuration found for domain %s', domain) result['error'].append(domain) return result
def _handle_revoke_cert(self, domain): rawcert = request.environ['ssl_certificate'] self._assert_admin(rawcert) self.acmeproxy.revoke_certificate(domain) return { 'status': 'revoked' }
def _handle_delete_cert(self, domain): rawcert = request.environ['ssl_certificate'] self._assert_admin(rawcert) self.acmeproxy.delete_certificate(domain) return { 'status': 'deleted' }
def test_ims(self): """ SendFile: If-Modified-Since""" request.environ['HTTP_IF_MODIFIED_SINCE'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) res = static_file(os.path.basename(__file__), root='./') self.assertEqual(304, res.status_code) self.assertEqual(int(os.stat(__file__).st_mtime), parse_date(res.headers['Last-Modified'])) self.assertAlmostEqual(int(time.time()), parse_date(res.headers['Date'])) request.environ['HTTP_IF_MODIFIED_SINCE'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(100)) self.assertEqual(open(__file__,'rb').read(), static_file(os.path.basename(__file__), root='./').body.read())
def test_download(self): """ SendFile: Download as attachment """ basename = os.path.basename(__file__) f = static_file(basename, root='./', download=True) self.assertEqual('attachment; filename="%s"' % basename, f.headers['Content-Disposition']) request.environ['HTTP_IF_MODIFIED_SINCE'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(100)) f = static_file(os.path.basename(__file__), root='./') self.assertEqual(open(__file__,'rb').read(), f.body.read())
def test_range(self): basename = os.path.basename(__file__) request.environ['HTTP_RANGE'] = 'bytes=10-25,-80' f = static_file(basename, root='./') c = open(basename, 'rb'); c.seek(10) self.assertEqual(c.read(16), tob('').join(f.body)) self.assertEqual('bytes 10-25/%d' % len(open(basename, 'rb').read()), f.headers['Content-Range']) self.assertEqual('bytes', f.headers['Accept-Ranges'])
def default(): result = {} table = ['<table class="u-full-width"><tbody>'] for k, v in sorted(os.environ.iteritems()): table.append('<tr><th>%s</th><td>%s</td></tr>' % (k, v)) table.append('</tbody></table>') result['sys_data'] = '\n'.join(table) table = ['<table class="u-full-width"><tbody>'] for k, v in sorted(dict(request.environ).iteritems()): table.append('<tr><th>%s</th><td>%s</td></tr>' % (k, v)) table.append('</tbody></table>') result['req_data'] = '\n'.join(table) return result
def tearDown(self): self.session.rollback() database.drop_all() database.stop_engine() # Bottle memoizes the 'request.json' attribute. # We don't want it memoized between two unit tests. # So let's dememoize it (yes, I don't think that word exists either). request.environ.pop('bottle.request.json', None) request.environ.pop('bottle.request.body', None)
def _set_body_json(self, data): body = json.dumps(data) request.environ['CONTENT_LENGTH'] = str(len(tob(body))) request.environ['CONTENT_TYPE'] = 'application/json' request.environ['wsgi.input'] = BytesIO() request.environ['wsgi.input'].write(tob(body)) request.environ['wsgi.input'].seek(0)
def test_start_deployment_with_impersonating(self, mocked): request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one() request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username' self._set_body_json({ 'target': { 'cluster': None, 'server': None }, 'branch': 'master', 'commit': 'abcde' }) api.environments_start_deployment(1, self.session)
def test_start_deployment_with_impersonating_unprivilegied_user(self, mocked): request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one() request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username' self._set_body_json({ 'target': { 'cluster': None, 'server': None }, 'branch': 'master', 'commit': 'abcde' }) with self.assertRaises(HTTPError) as cm: api.environments_start_deployment(2, self.session) self.assertEquals(403, cm.exception.code)
def generic_handler(): """ The generic handler catches all requests not caught by any other route. It checks the configuration to see if the URL requested is one registered as a job's webhook URL handler. If so, it normalizes the request and queues the job for building. It returns immediately (aynsc) with a JSON structure containing the job id. """ jobdef_manager = request.deps['jobdef_manager'] build_queue = request.deps['build_queue'] config = request.deps['config'] providers = request.deps['providers'] jobdef = jobdef_manager.get_jobdef_from_url(request.path) if not jobdef: abort(404, "Not found") logging.info("Received event for job '{}'".format(jobdef.name)) # Log debug info about the received request logging.debug("request environ: {}".format(request.environ)) logging.debug("request path: {}".format(request.path)) logging.debug("request method: {}".format(request.method)) for k, v in request.headers.items(): logging.debug("request header: {}={}".format(k, v)) for k, v in request.query.items(): logging.debug("request query: {}={}".format(k, v)) logging.debug("request body: {}".format(request.body.read())) logging.debug("request auth: {}".format(request.auth)) env = job.make_env(request, jobdef, providers) job_inst = jobdef.make_job(request.body.read().decode('utf8'), env) build_queue.put(job_inst) return {'id': job_inst.id}