我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.url_root()。
def advisory_atom(): last_recent_entries = 15 data = get_advisory_data()['published'][:last_recent_entries] feed = AtomFeed('Arch Linux Security - Recent advisories', feed_url=request.url, url=request.url_root) for entry in data: advisory = entry['advisory'] package = entry['package'] title = '[{}] {}: {}'.format(advisory.id, package.pkgname, advisory.advisory_type) feed.add(title=title, content=render_template('feed.html', content=advisory.content), content_type='html', summary=render_template('feed.html', content=advisory.impact), summary_tpe='html', author='Arch Linux Security Team', url=TRACKER_ISSUE_URL.format(advisory.id), published=advisory.created, updated=advisory.created) return feed.get_response()
def rss_feed(): feed = AtomFeed('White House Briefing Room Releases', feed_url=request.url, url=request.url_root) documents = WhiteHouse.query.order_by(WhiteHouse.document_date.desc()) for document in documents: feed.add(document.title, document.tweet, content_type='text', author="@presproject2017", url=make_external(document.full_url), updated=document.document_date, published=document.document_date) return feed.get_response()
def run_tensorboard(run_id, tflog_id): """Launch TensorBoard for a given run ID and log ID of that run.""" data = current_app.config["data"] # optimisticaly suppose the run exists... run = data.get_run(run_id) base_dir = Path(run["experiment"]["base_dir"]) log_dir = Path(run["info"]["tensorflow"]["logdirs"][tflog_id]) # TODO ugly!!! if log_dir.is_absolute(): path_to_log_dir = log_dir else: path_to_log_dir = base_dir.joinpath(log_dir) port = int(tensorboard.run_tensorboard(str(path_to_log_dir))) url_root = request.url_root url_parts = re.search("://([^:/]+)", url_root) redirect_to_address = url_parts.group(1) return redirect("http://%s:%d" % (redirect_to_address, port))
def get(self): LOG.debug("API CALL: %s GET" % str(self.__class__.__name__)) resp = dict() resp['versions'] = dict() versions = [{ "status": "CURRENT", "id": "v2", "links": [ { "href": request.url_root + '/v2', "rel": "self" } ] }] resp['versions'] = versions return Response(json.dumps(resp), status=200, mimetype='application/json')
def get(self): """ Lists API versions. :return: Returns a json with API versions. :rtype: :class:`flask.response` """ LOG.debug("API CALL: Neutron - List API Versions") resp = dict() resp['versions'] = dict() versions = [{ "status": "CURRENT", "id": "v2.0", "links": [ { "href": request.url_root + '/v2.0', "rel": "self" } ] }] resp['versions'] = versions return Response(json.dumps(resp), status=200, mimetype='application/json')
def get_feed(): from mhn.common.clio import Clio from mhn.auth import current_user authfeed = mhn.config['FEED_AUTH_REQUIRED'] if authfeed and not current_user.is_authenticated(): abort(404) feed = AtomFeed('MHN HpFeeds Report', feed_url=request.url, url=request.url_root) sessions = Clio().session.get(options={'limit': 1000}) for s in sessions: feedtext = u'Sensor "{identifier}" ' feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.' feedtext = feedtext.format(**s.to_dict()) feed.add('Feed', feedtext, content_type='text', published=s.timestamp, updated=s.timestamp, url=makeurl(url_for('api.get_session', session_id=str(s._id)))) return feed
def post(year, month, day, post_name): rel_url = request.path[len('/post/'):] fixed_rel_url = storage.fix_post_relative_url(rel_url) if rel_url != fixed_rel_url: return redirect(request.url_root + 'post/' + fixed_rel_url) # it's not the correct relative url, so redirect post_ = storage.get_post(rel_url, include_draft=False) if post_ is None: abort(404) post_d = post_.to_dict() del post_d['raw_content'] post_d['content'] = get_parser(post_.format).parse_whole(post_.raw_content) post_d['content'], post_d['toc'], post_d['toc_html'] = parse_toc(post_d['content']) post_d['url'] = make_abs_url(post_.unique_key) post_ = post_d return custom_render_template(post_['layout'] + '.html', entry=post_)
def streaming_video(url_root): '''Video streaming generator function''' try: while True: if remote_control_cozmo: image = get_annotated_image() img_io = io.BytesIO() image.save(img_io, 'PNG') img_io.seek(0) yield (b'--frame\r\n' b'Content-Type: image/png\r\n\r\n' + img_io.getvalue() + b'\r\n') else: asyncio.sleep(.1) except cozmo.exceptions.SDKShutdown: # Tell the main flask thread to shutdown requests.post(url_root + 'shutdown')
def atom(): """ of the news page. """ resp = render_template('news/atom.xml', news=latest_news(current_session)) response = make_response(resp) response.headers['Content-Type'] = 'application/atom+xml; charset=utf-8; filename=news-ATOM' return response # This makes output which crashes a feed validator. # from werkzeug.contrib.atom import AtomFeed # news=latest_news(current_session) # feed = AtomFeed('pygame news', feed_url=request.url, url=request.url_root) # for new in news: # feed.add(new.title, new.description_html, # content_type='html', # author='pygame', # url='https://www.pygame.org/news.html', # updated=new.datetimeon, # published=new.datetimeon) # return feed.get_response()
def get(self, hash): path = 'static' + os.sep + 'client.zip' try: os.remove(path) except: None zip = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED) for root, dirs, files in os.walk(CLIENT_FOLDER): for f in files: zip.write(os.path.join(root, f)) zip.close() client = open(path).read() if hash == hashlib.md5(client).hexdigest(): return {"err": "invalid request"}, 400 else: return {"url": request.url_root + path}, 200
def server_netloc(): """ Figure out the name of the server end of the request, punting if it's the local host or not available. """ return urlparse.urlparse(request.url_root).netloc # # URLs #
def internal_url(path): return request.url_root + path
def root_url(path = None): return request.url_root + ("" if path is None else path)
def make_external(url): return urljoin(request.url_root, url)
def href_for(self, operation, qs=None, **kwargs): """ Construct an full href for an operation against a resource. :parm qs: the query string dictionary, if any :param kwargs: additional arguments for path expansion """ url = urljoin(request.url_root, self.url_for(operation, **kwargs)) qs_character = "?" if url.find("?") == -1 else "&" return "{}{}".format( url, "{}{}".format(qs_character, urlencode(qs)) if qs else "", )
def feed(): """Return an atom feed for the blog.""" feed = AtomFeed( '%s: Recent Posts' % app.config.get('SITENAME', 'akamatsu'), feed_url=request.url, url=request.url_root ) posts = ( Post.query .filter_by(is_published=True, ghost='') .order_by(Post.timestamp.desc()) .limit(15) ) for post in posts: # unicode conversion is needed for the content feed.add( post.title, markdown.render(post.content).unescape(), content_type='html', author=post.author.username, url=url_for('blog.show', slug=post.slug, _external=True), updated=post.timestamp ) return feed.get_response()
def get_sign_in_view(target): signin_url = request.url_root + target oauth_service = OAuth2Service( name="google", client_id=current_app.config["GOOGLE_LOGIN_CLIENT_ID"], client_secret=current_app.config["GOOGLE_LOGIN_CLIENT_SECRET"], authorize_url=google_params.get("authorization_endpoint"), base_url=google_params.get("userinfo_endpoint"), access_token_url=google_params.get("token_endpoint")) if "code" in request.args: oauth_session = oauth_service.get_auth_session( data={"code": request.args["code"], "grant_type": "authorization_code", "redirect_uri": signin_url}, decoder=json.loads) user_data = oauth_session.get("").json() user = load_user(user_data["email"]) if user: flask_login.login_user(user) return redirect(url_for("index")) else: error_message = "Not an authorized user ({})".format(user_data["email"]) return render_template("/sign_in.html", error_message=error_message) elif "authorize" in request.args: return redirect(oauth_service.get_authorize_url( scope="email", response_type="code", prompt="select_account", redirect_uri=signin_url)) else: return render_template("/sign_in.html")
def get_next_url(self): """Returns the URL where we want to redirect to. This will always return a valid URL. """ return ( self.check_safe_root(request.values.get('next')) or self.check_safe_root(request.referrer) or (self.fallback_endpoint and self.check_safe_root(url_for(self.fallback_endpoint))) or request.url_root )
def check_safe_root(self, url): if url is None: return None if self.safe_roots is None: return url if url.startswith(request.url_root) or url.startswith('/'): # A URL inside the same app is deemed to always be safe return url for safe_root in self.safe_roots: if url.startswith(safe_root): return url return None
def csrf_protect(): if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'): referer = request.headers.get('Referer') if referer is None or different_origin(referer, request.url_root): raise Forbidden(description="Referer check failed.")
def required(self, price, **kwargs): """API route decorator to request payment for a resource. This function stores the resource price in a closure. It will verify the validity of a payment, and allow access to the resource if the payment is successfully accepted. """ def decorator(fn): """Validates payment and returns the original API route.""" @wraps(fn) def _fn(*fn_args, **fn_kwargs): # Calculate resource cost nonlocal price _price = price(request, *fn_args, **fn_kwargs) if callable(price) else price # Need better way to pass server url to payment methods (FIXME) if 'server_url' not in kwargs: url = urlparse(request.url_root) kwargs.update({'server_url': url.scheme + '://' + url.netloc}) # Continue to the API view if payment is valid or price is 0 if _price == 0: return fn(*fn_args, **fn_kwargs) try: contains_payment = self.contains_payment(_price, request.headers, **kwargs) except BadRequest as e: return Response(e.description, BAD_REQUEST) if contains_payment: return fn(*fn_args, **fn_kwargs) else: # Get headers for initial 402 response payment_headers = {} for method in self.allowed_methods: payment_headers.update(method.get_402_headers(_price, **kwargs)) # Accessing the .files attribute of a request # drains the input stream. request.files raise PaymentRequiredException(payment_headers) return _fn return decorator
def siteURL(config,request): u = current_app.config.get('SITE_URL') return u if u is not None else request.url_root[0:-1]
def page_not_found(error): return render_template_string(generate_template(current_app.config,'error.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], path=request.path, entry=None, error="I'm sorry. I can't find that page.")
def send_doc(path): siteURL = current_app.config.get('SITE_URL') location = current_app.config.get('DOCS') if location is None: abort(404) if location[0:4]=='http': url = location + path req = requests.get(url, stream = True,headers={'Connection' : 'close'}) if req.headers['Content-Type'][0:9]=='text/html': return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=req.text, entry=None) else: return Response(stream_with_context(req.iter_content()), headers = dict(req.headers)) else: dir = os.path.abspath(location) if path.endswith('.html'): glob = StringIO() try: with open(os.path.join(dir,path), mode='r', encoding='utf-8') as doc: peeked = doc.readline() if peeked.startswith('<!DOCTYPE'): return send_from_directory(dir, path) glob.write(peeked) for line in doc: glob.write(line) return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=glob.getvalue(), entry=None) except FileNotFoundError: abort(404) return send_from_directory(dir, path)
def ipxe_boot(node): response = config_renderer.ipxe.render(node, request.url_root) return Response(response, mimetype='text/plain')
def report(node): if not node.maintenance_mode: try: node.active_config_version = int(request.args.get('version')) except (ValueError, TypeError): return abort(400) if request.content_type != 'application/json': return abort(400) provision = models.Provision() provision.node = node provision.config_version = node.active_config_version provision.ignition_config = request.data if node.target_config_version == node.active_config_version: provision.ipxe_config = config_renderer.ipxe.render(node, request.url_root) models.db.session.add(provision) models.db.session.add(node) node.disks.update({ models.Disk.wipe_next_boot: False }) if node.cluster.are_etcd_nodes_configured: node.cluster.assert_etcd_cluster_exists = True models.db.session.add(node.cluster) models.db.session.commit() return Response('ok', mimetype='application/json')
def get_content(self): packages = [P(self.node, request.url_root) for P in self.get_package_classes()] files = list(itertools.chain.from_iterable(p.get_files() for p in packages)) units = list(itertools.chain.from_iterable(p.get_units() for p in packages)) networkd_units = list(itertools.chain.from_iterable(p.get_networkd_units() for p in packages)) ssh_keys = self.get_ssh_keys() return { 'ignition': { 'version': '2.0.0', 'config': {}, }, 'storage': self.get_storage_config(files), 'networkd': { 'units': networkd_units }, 'passwd': { 'users': [{ 'name': 'root', 'sshAuthorizedKeys': ssh_keys, }, { 'name': 'core', 'sshAuthorizedKeys': ssh_keys, }], }, 'systemd': { 'units': units }, }
def target_ipxe_config_view(self): node = self.get_one(request.args.get('id')) response = config_renderer.ipxe.render(node, request.url_root) return Response(response, mimetype='text/plain')
def handle_cozmoImage(): if is_microsoft_browser(request): return serve_single_image() return flask_helpers.stream_video(streaming_video, request.url_root)
def login(): if g.auth: return redirect(url_for("index")) else: query = {"sso": True, "sso_r": SpliceURL.Modify(request.url_root, "/sso/").geturl, "sso_p": SSO["SSO.PROJECT"], "sso_t": md5("%s:%s" %(SSO["SSO.PROJECT"], SpliceURL.Modify(request.url_root, "/sso/").geturl)) } SSOLoginURL = SpliceURL.Modify(url=SSO["SSO.URL"], path="/login/", query=query).geturl logger.info("User request login to SSO: %s" %SSOLoginURL) return redirect(SSOLoginURL)
def logout(): SSOLogoutURL = SSO.get("SSO.URL") + "/sso/?nextUrl=" + request.url_root.strip("/") resp = make_response(redirect(SSOLogoutURL)) resp.set_cookie(key='logged_in', value='', expires=0) resp.set_cookie(key='username', value='', expires=0) resp.set_cookie(key='sessionId', value='', expires=0) resp.set_cookie(key='time', value='', expires=0) resp.set_cookie(key='Azone', value='', expires=0) return resp
def atom(): feed = AtomFeed(title='Recent rules', feed_url=request.url, url=request.url_root, author='Spike', icon=url_for('static', filename='favicon.ico')) _rules = NaxsiRules.query.order_by(NaxsiRules.sid.desc()).limit(15).all() if _rules: for rule in _rules: feed.add(rule.msg, str(rule), updated=datetime.fromtimestamp(rule.timestamp), id=rule.sid) return feed.get_response()
def authorize(): questradeAPI = OAuth2Session(client_id, redirect_uri=__get_redirect_uri__(request.url_root)) user_authorization_url, state = questradeAPI.authorization_url(authorization_url) session['oauth_state'] = state return redirect(user_authorization_url)
def callback(): questradeAPI = OAuth2Session(client_id, redirect_uri=__get_redirect_uri__(request.url_root), state=session['oauth_state']) token = questradeAPI.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url) __set_session_token__(token) return redirect(url_for('.token'))
def make_blog_feed(order=None, limit=15): feed = AtomFeed( title="{} - Blog Feed".format(settings.title), subtitle=settings.tagline, feed_url=request.url, url=request.url_root, author="Musharraf Omer", icon=None, logo=None, rights="Copyright 2000-2016 - Mushy.ltd", ) order = order or Post.publish_date.desc() items = Post.query.order_by(order).limit(limit).all() for item in items: item.url = url_for('canella-blog.post', slug=item.slug) feed.add( title=item.title, url=item.url, content=make_summary(item.body), content_type='html', summary=item.meta_description, updated=item.updated or item.created, author="Musharraf Omer", published=item.publish_date, categories=[{'term': t.slug, 'label': t.title} for t in item.tags] ) return feed
def delhistory(): if not str(request.referrer).startswith(request.url_root): #app.logger.info("referer:", str(request.referrer)) return "CSRF ATTEMPT!" os.unlink('ua-history.txt') return "ok"
def _preemptive_unless(base_url=None, additional_unless=None): if base_url is None: base_url = request.url_root disabled_for_root = not settings().getBoolean(["devel", "cache", "preemptive"]) \ or base_url in settings().get(["server", "preemptiveCache", "exceptions"]) \ or not (base_url.startswith("http://") or base_url.startswith("https://")) recording_disabled = request.headers.get("X-Preemptive-Record", "yes") == "no" if callable(additional_unless): return recording_disabled or disabled_for_root or additional_unless() else: return recording_disabled or disabled_for_root
def _preemptive_data(key, path=None, base_url=None, data=None, additional_request_data=None): if path is None: path = request.path if base_url is None: base_url = request.url_root d = dict(path=path, base_url=base_url, query_string="l10n={}".format(g.locale.language if g.locale else "en")) if key != "_default": d["plugin"] = key # add data if we have any if data is not None: try: if callable(data): data = data() if data: if "query_string" in data: data["query_string"] = "l10n={}&{}".format(g.locale.language, data["query_string"]) d.update(data) except: _logger.exception("Error collecting data for preemptive cache from plugin {}".format(key)) # add additional request data if we have any if callable(additional_request_data): try: ard = additional_request_data() if ard: d.update(dict( _additional_request_data=ard )) except: _logger.exception("Error retrieving additional data for preemptive cache from plugin {}".format(key)) return d
def dict(self): _id = str(self._id) return { "_id": _id, "name": self.name, "email": self.email, "links": { "self": "{}api/rsvps/{}".format(request.url_root, _id) } }
def set_webhook(): """ Sets the BicingBot webhook in its Telegram Bot :return: HTTP_RESPONSE with 200 OK status and a status message. """ response = 'Webhook configured' if request.url_root.startswith('https'): bot_response = get_bot().setWebhook('{}/bicingbot'.format(request.url_root)) logger.debug(bot_response) else: response = 'Bad webhook: https url must be provided for webhook' logger.warn(response) return response
def ImageUrlToFile(image_url): """images are stored as ad-213213.png in the db. We get them from the website as /static/upload-debug/ad-213213.png so we trim them. Returns True, filepath""" BAD_RETURN = False, '' prefix = request.url_root + "static/%s/" % app.config[Constants.KEY_UPLOAD_DIR] if not image_url.startswith(prefix): return BAD_RETURN a = image_url.split('/') if not a or (image_url != prefix + a[-1]): return BAD_RETURN return True, a[-1]
def FileToImageUrl(image_file): return request.url_root + "static/%s/%s" % (app.config[Constants.KEY_UPLOAD_DIR], image_file)
def feeds_blogs(): """Global feed generator for latest blogposts across all projects""" @current_app.cache.cached(60*5) def render_page(): feed = AtomFeed('Blender Cloud - Latest updates', feed_url=request.url, url=request.url_root) # Get latest blog posts api = system_util.pillar_api() latest_posts = Node.all({ 'where': {'node_type': 'post', 'properties.status': 'published'}, 'embedded': {'user': 1}, 'sort': '-_created', 'max_results': '15' }, api=api) # Populate the feed for post in latest_posts._items: author = post.user.fullname updated = post._updated if post._updated else post._created url = url_for_node(node=post) content = post.properties.content[:500] content = '<p>{0}... <a href="{1}">Read more</a></p>'.format(content, url) feed.add(post.name, str(content), content_type='html', author=author, url=url, updated=updated, published=post._created) return feed.get_response() return render_page()
def insert_bucket(project, conn): """ Creates a new bucket. Args: project: A string specifying a project ID. conn: An S3Connection instance. Returns: A JSON string representing a bucket. """ bucket_info = request.get_json() # TODO: Do the following lookup and create under a lock. if conn.lookup(bucket_info['name']) is not None: return error('Sorry, that name is not available. ' 'Please try a different one.', HTTP_CONFLICT) index_bucket(bucket_info['name'], project) conn.create_bucket(bucket_info['name']) # The HEAD bucket request does not return creation_date. This is an # inefficient way of retrieving it. try: bucket = next(bucket for bucket in conn.get_all_buckets() if bucket.name == bucket_info['name']) except StopIteration: return error('Unable to find bucket after creating it.') bucket_url = url_for('get_bucket', bucket_name=bucket.name) response = { 'kind': 'storage#bucket', 'id': bucket.name, 'selfLink': request.url_root[:-1] + bucket_url, 'name': bucket.name, 'timeCreated': bucket.creation_date, 'updated': bucket.creation_date } return Response(json.dumps(response), mimetype='application/json')
def get_bucket(bucket_name, conn): """ Returns metadata for the specified bucket. Args: bucket_name: A string specifying a bucket name. conn: An S3Connection instance. Returns: A JSON string representing a bucket. """ projection = request.args.get('projection') or 'noAcl' if projection != 'noAcl': return error('projection: {} not supported.'.format(projection), HTTP_NOT_IMPLEMENTED) try: bucket = next(bucket for bucket in conn.get_all_buckets() if bucket.name == bucket_name) except StopIteration: return error('Not Found', HTTP_NOT_FOUND) bucket_url = url_for('get_bucket', bucket_name=bucket.name) response = { 'kind': 'storage#bucket', 'id': bucket.name, 'selfLink': request.url_root[:-1] + bucket_url, 'name': bucket.name, 'timeCreated': bucket.creation_date, 'updated': bucket.creation_date } return Response(json.dumps(response), mimetype='application/json')
def get(self, provider): if provider == 'github': return github.authorize(callback=request.url_root + 'api/v1/auth/callback/github') raise ProviderInvalid(provider)
def _url_for(self, path): return request.url_root + self.api_prefix + path
def index_discovery(): host = request.url_root domain = request.headers['Host'] return """<html lang="en"> <head> <meta charset="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="appr-package" content="{domain}/{{name}} {host}/appr/api/v1/packages/{{name}}/pull"> </head> <body> </body> </html>""".format(domain=domain, host=host)