我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用bottle.request.url()。
def init_audit(log_group): def audit(f): @functools.wraps(f) def handle(account_id, *args, **kw): envelope = { 'timestamp': int(time.time() * 1000), 'message': json.dumps({ 'user': request.environ.get('REMOTE_USER', ''), 'url': request.url, 'path': request.path, 'method': request.method, 'pid': os.getpid(), 'account_id': account_id, 'ip': request.remote_addr}) } transport.send_group("%s=%s" % (log_group, account_id), [envelope]) return f(account_id, *args, **kw) return handle return audit
def test_url(self): """ Environ: URL building """ request = BaseRequest({'HTTP_HOST':'example.com'}) self.assertEqual('http://example.com/', request.url) request = BaseRequest({'SERVER_NAME':'example.com'}) self.assertEqual('http://example.com/', request.url) request = BaseRequest({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'}) self.assertEqual('http://example.com:81/', request.url) request = BaseRequest({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'}) self.assertEqual('https://example.com/', request.url) request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/path', 'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'}) self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url) request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/pa th', 'SCRIPT_NAME':'/s p'}) self.assertEqual('http://example.com/s%20p/pa%20th', request.url)
def callback(): """ Step 3: Retrieving an access token. The user has been redirected back from the provider to your registered callback URL. With this redirection comes an authorization code included in the redirect URL. We will use that to obtain an access token. NOTE: your server name must be correctly configured in order for this to work, do this by adding the headers at your http layer, in particular: X_FORWARDED_HOST, X_FORWARDED_PROTO so that bottle can render the correct url and links for you. """ oauth2session = OAuth2Session(settings.SLACK_OAUTH['client_id'], state=request.GET['state']) #PII - update privacy policy if oauth2 token is stored. token = oauth2session.fetch_token( settings.SLACK_OAUTH['token_url'], client_secret=settings.SLACK_OAUTH['client_secret'], authorization_response=request.url ) # we don't need the token, we just need the user to have installed the app # in the future, if we need tokens, we'll get them. redirect('/?added_to_slack=true')
def mmrz(): username = request.get_cookie('username') password = request.get_cookie('password') password = urllib.unquote(password) if password else None if not verify_login(username, password): redirect('/') # need_https = "localhost" not in request.url need_https = False return_dict = dict(universal_ROUTE_dict) return_dict.update(dict(need_https=need_https)) return return_dict
def dict_short(): query = request.urlparts.query url = '/dictionary?{0}'.format(query) if query else '/dictionary' redirect(url)
def pre_processor(): s = request.environ.get('beaker.session') user = parse_userdata(s) perms = parse_permissions(s) status = {} captcha = False update = False plugins = False if user["is_authenticated"]: status = PYLOAD.statusServer() info = PYLOAD.getInfoByPlugin("UpdateManager") captcha = PYLOAD.isCaptchaWaiting() # check if update check is available if info: if info["pyload"] == "True": update = True if info["plugins"] == "True": plugins = True return {"user": user, 'status': status, 'captcha': captcha, 'perms': perms, 'url': request.url, 'update': update, 'plugins': plugins}
def info(account_id, resource_id): request_data = request.query if resource_id.startswith('sg-') and 'parent_id' not in request_data: abort(400, "Missing required parameter parent_id") result = controller.info( account_id, resource_id, request_data.get('parent_id', resource_id)) response.content_type = "application/json" return json.dumps(result, indent=2, cls=Encoder) # this set to post to restrict permissions, perhaps another url space.
def api_url(): parsed = request.urlparts url = "%s://%s%s" % (parsed.scheme, parsed.netloc, request.script_name) return url
def error(e): response.content_type = "application/json" return json.dumps({ "status": e.status, "url": repr(request.url), "exception": repr(e.exception), # "traceback": e.traceback and e.traceback.split('\n') or '', "body": repr(e.body) }, indent=2)
def dispatch(url): """ This class is the beginning of all entrypoints in the Ray API. Here, each url will be redirect to the right handler """ url = bottle_req.path log.info('request: %s', bottle_req.url) if url[-1] == '/': url = url[:-1] response_code = 200 try: processed = process(url, bottle_req, bottle_resp) try: from_func, http_status = processed[0], processed[1] bottle_resp.status = http_status return from_func except: return processed except exceptions.RayException as e: log.exception('ray exception: ') response_code = e.http_code except: log.exception('exception:') raise bottle_resp.status = response_code
def __handle_action(url): # url e.g: /api/user/123/action arg = None if len(url.split('/')) >= 5: # indicate that has an id between endpoint and action_name arg = http.param_at(url, -2) return Action(url, arg, bottle_req).process_action()
def query_hujiang(key_word): if not key_word: return [] headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1', 'Accept-Encoding': 'gzip, deflate, sdch', } url = "https://m.hujiang.com/d/dict_jp_api.ashx?type=jc&w={0}".format(urllib.quote(key_word)) proxies = {} response = requests.get(url, headers=headers, verify=False, proxies=proxies) try: defines = response.json() except: return [] for i in range(len(defines)): Comment = defines[i]["Comment"] comments = re.findall("<br/>([^a-zA-Z]+)<br/>", Comment) tmp = ", ".join(comments) # ??tmp??, ?????????? # tmp = Comment if not tmp else tmp # ??????????? # mch = re.search(u"(?.+??)", tmp) # tmp = mch.group(1) if mch else tmp # ?????? tmp = re.sub(u"\?.+?\?", "", tmp) tmp = re.sub(u"\(.+?\)", "", tmp) tmp = re.sub(u"?+?", "", tmp) defines[i]["Comment"] = tmp defines[i]["PronounceJp"] = re.sub("\[|\]", "", defines[i]["PronounceJp"]) return defines ### static files
def get_hujiang_tts(): key_word = request.params.get('key_word', None) job_id = request.params.get('job_id', None) if not key_word: return "key_word is null" headers = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.89 Safari/537.36', 'Accept-Encoding': 'gzip, deflate, sdch', } url = "http://dict.hjenglish.com/jp/jc/" + urllib.quote(key_word) req = urllib2.Request(url, None, headers) response = urllib2.urlopen(req) compressedData = response.read() compressedStream = StringIO.StringIO(compressedData) gzipper = gzip.GzipFile(fileobj=compressedStream) html = gzipper.read() soup = BeautifulSoup(html, "html.parser") ret_info = { "found": False, "message_str": "", "tts_url": "", "job_id": job_id, } jpSound_list = soup.select('span[class=jpSound]') if len(jpSound_list) < 1: ret_info["found"] = False ret_info["message_str"] = "jpSound not found" return json.dumps(ret_info) jpSound = str(jpSound_list[0]) mc = re.search("GetTTSVoice\(\"(.*?)\"\)", jpSound) if not mc: ret_info["found"] = False ret_info["message_str"] = "tts_url not found" return json.dumps(ret_info) tts_url = mc.group(1) ret_info["found"] = True ret_info["message_str"] = "tts_url is found" ret_info["tts_url"] = tts_url return json.dumps(ret_info)