Python flask.request 模块,url_root() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.url_root()

项目:arch-security-tracker    作者:archlinux    | 项目源码 | 文件源码
def advisory_atom():
    last_recent_entries = 15
    data = get_advisory_data()['published'][:last_recent_entries]
    feed = AtomFeed('Arch Linux Security - Recent advisories',
                    feed_url=request.url, url=request.url_root)

    for entry in data:
        advisory = entry['advisory']
        package = entry['package']
        title = '[{}] {}: {}'.format(advisory.id, package.pkgname, advisory.advisory_type)

        feed.add(title=title,
                 content=render_template('feed.html', content=advisory.content),
                 content_type='html',
                 summary=render_template('feed.html', content=advisory.impact),
                 summary_tpe='html',
                 author='Arch Linux Security Team',
                 url=TRACKER_ISSUE_URL.format(advisory.id),
                 published=advisory.created,
                 updated=advisory.created)
    return feed.get_response()
项目:presidency    作者:jayrav13    | 项目源码 | 文件源码
def rss_feed():

    feed = AtomFeed('White House Briefing Room Releases', feed_url=request.url, url=request.url_root)

    documents = WhiteHouse.query.order_by(WhiteHouse.document_date.desc())

    for document in documents:

        feed.add(document.title, document.tweet,
            content_type='text',
            author="@presproject2017",
            url=make_external(document.full_url),
            updated=document.document_date,
            published=document.document_date)

    return feed.get_response()
项目:sacredboard    作者:chovanecm    | 项目源码 | 文件源码
def run_tensorboard(run_id, tflog_id):
    """Launch TensorBoard for a given run ID and log ID of that run."""
    data = current_app.config["data"]
    # optimisticaly suppose the run exists...
    run = data.get_run(run_id)
    base_dir = Path(run["experiment"]["base_dir"])
    log_dir = Path(run["info"]["tensorflow"]["logdirs"][tflog_id])
    # TODO ugly!!!
    if log_dir.is_absolute():
        path_to_log_dir = log_dir
    else:
        path_to_log_dir = base_dir.joinpath(log_dir)

    port = int(tensorboard.run_tensorboard(str(path_to_log_dir)))
    url_root = request.url_root
    url_parts = re.search("://([^:/]+)", url_root)
    redirect_to_address = url_parts.group(1)
    return redirect("http://%s:%d" % (redirect_to_address, port))
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def get(self):
        LOG.debug("API CALL: %s GET" % str(self.__class__.__name__))
        resp = dict()
        resp['versions'] = dict()
        versions = [{
            "status": "CURRENT",
            "id": "v2",
            "links": [
                {
                    "href": request.url_root + '/v2',
                    "rel": "self"
                }
            ]
        }]
        resp['versions'] = versions
        return Response(json.dumps(resp), status=200, mimetype='application/json')
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def get(self):
        """
        Lists API versions.

        :return: Returns a json with API versions.
        :rtype: :class:`flask.response`
        """
        LOG.debug("API CALL: Neutron - List API Versions")
        resp = dict()
        resp['versions'] = dict()

        versions = [{
            "status": "CURRENT",
            "id": "v2.0",
            "links": [
                {
                    "href": request.url_root + '/v2.0',
                    "rel": "self"
                }
            ]
        }]
        resp['versions'] = versions

        return Response(json.dumps(resp), status=200, mimetype='application/json')
项目:honeyd-python    作者:sookyp    | 项目源码 | 文件源码
def get_feed():
    from mhn.common.clio import Clio
    from mhn.auth import current_user
    authfeed = mhn.config['FEED_AUTH_REQUIRED']
    if authfeed and not current_user.is_authenticated():
        abort(404)
    feed = AtomFeed('MHN HpFeeds Report', feed_url=request.url,
                    url=request.url_root)
    sessions = Clio().session.get(options={'limit': 1000})
    for s in sessions:
        feedtext = u'Sensor "{identifier}" '
        feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.'
        feedtext = feedtext.format(**s.to_dict())
        feed.add('Feed', feedtext, content_type='text',
                 published=s.timestamp, updated=s.timestamp,
                 url=makeurl(url_for('api.get_session', session_id=str(s._id))))
    return feed
项目:veripress    作者:veripress    | 项目源码 | 文件源码
def post(year, month, day, post_name):
    rel_url = request.path[len('/post/'):]
    fixed_rel_url = storage.fix_post_relative_url(rel_url)
    if rel_url != fixed_rel_url:
        return redirect(request.url_root + 'post/' + fixed_rel_url)  # it's not the correct relative url, so redirect

    post_ = storage.get_post(rel_url, include_draft=False)
    if post_ is None:
        abort(404)

    post_d = post_.to_dict()
    del post_d['raw_content']
    post_d['content'] = get_parser(post_.format).parse_whole(post_.raw_content)
    post_d['content'], post_d['toc'], post_d['toc_html'] = parse_toc(post_d['content'])
    post_d['url'] = make_abs_url(post_.unique_key)
    post_ = post_d

    return custom_render_template(post_['layout'] + '.html', entry=post_)
项目:cozmo-python-sdk    作者:anki    | 项目源码 | 文件源码
def streaming_video(url_root):
    '''Video streaming generator function'''
    try:
        while True:
            if remote_control_cozmo:
                image = get_annotated_image()

                img_io = io.BytesIO()
                image.save(img_io, 'PNG')
                img_io.seek(0)
                yield (b'--frame\r\n'
                    b'Content-Type: image/png\r\n\r\n' + img_io.getvalue() + b'\r\n')
            else:
                asyncio.sleep(.1)
    except cozmo.exceptions.SDKShutdown:
        # Tell the main flask thread to shutdown
        requests.post(url_root + 'shutdown')
项目:pygameweb    作者:pygame    | 项目源码 | 文件源码
def atom():
    """ of the news page.
    """
    resp = render_template('news/atom.xml', news=latest_news(current_session))
    response = make_response(resp)
    response.headers['Content-Type'] = 'application/atom+xml; charset=utf-8; filename=news-ATOM'
    return response

    # This makes output which crashes a feed validator.
    # from werkzeug.contrib.atom import AtomFeed
    # news=latest_news(current_session)
    # feed = AtomFeed('pygame news', feed_url=request.url, url=request.url_root)
    # for new in news:
    #     feed.add(new.title, new.description_html,
    #              content_type='html',
    #              author='pygame',
    #              url='https://www.pygame.org/news.html',
    #              updated=new.datetimeon,
    #              published=new.datetimeon)
    # return feed.get_response()
项目:FuzzFlow    作者:talos-vulndev    | 项目源码 | 文件源码
def get(self, hash):
        path = 'static' + os.sep + 'client.zip'
        try:
            os.remove(path)
        except:
            None
        zip = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
        for root, dirs, files in os.walk(CLIENT_FOLDER):
            for f in files:
                zip.write(os.path.join(root, f))
        zip.close()

        client = open(path).read()

        if hash == hashlib.md5(client).hexdigest():
            return {"err": "invalid request"}, 400
        else:
            return {"url": request.url_root + path}, 200
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def server_netloc():
    """
    Figure out the name of the server end of the request, punting if it's
    the local host or not available.
    """

    return urlparse.urlparse(request.url_root).netloc


#
# URLs
#
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def internal_url(path):
    return request.url_root + path
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def root_url(path = None):
    return request.url_root + ("" if path is None else path)
项目:presidency    作者:jayrav13    | 项目源码 | 文件源码
def make_external(url):
    return urljoin(request.url_root, url)
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def href_for(self, operation, qs=None, **kwargs):
        """
        Construct an full href for an operation against a resource.

        :parm qs: the query string dictionary, if any
        :param kwargs: additional arguments for path expansion

        """
        url = urljoin(request.url_root, self.url_for(operation, **kwargs))
        qs_character = "?" if url.find("?") == -1 else "&"

        return "{}{}".format(
            url,
            "{}{}".format(qs_character, urlencode(qs)) if qs else "",
        )
项目:akamatsu    作者:rmed    | 项目源码 | 文件源码
def feed():
    """Return an atom feed for the blog."""
    feed = AtomFeed(
        '%s: Recent Posts' % app.config.get('SITENAME', 'akamatsu'),
        feed_url=request.url,
        url=request.url_root
    )

    posts = (
        Post.query
        .filter_by(is_published=True, ghost='')
        .order_by(Post.timestamp.desc())
        .limit(15)
    )

    for post in posts:
        # unicode conversion is needed for the content
        feed.add(
            post.title,
            markdown.render(post.content).unescape(),
            content_type='html',
            author=post.author.username,
            url=url_for('blog.show', slug=post.slug, _external=True),
            updated=post.timestamp
        )

    return feed.get_response()
项目:encore    作者:statgen    | 项目源码 | 文件源码
def get_sign_in_view(target):
    signin_url = request.url_root + target
    oauth_service = OAuth2Service(
        name="google",
        client_id=current_app.config["GOOGLE_LOGIN_CLIENT_ID"],
        client_secret=current_app.config["GOOGLE_LOGIN_CLIENT_SECRET"],
        authorize_url=google_params.get("authorization_endpoint"),
        base_url=google_params.get("userinfo_endpoint"),
        access_token_url=google_params.get("token_endpoint"))

    if "code" in request.args:
        oauth_session = oauth_service.get_auth_session(
            data={"code": request.args["code"],
                  "grant_type": "authorization_code",
                  "redirect_uri": signin_url},
            decoder=json.loads)
        user_data = oauth_session.get("").json()
        user = load_user(user_data["email"])
        if user:
            flask_login.login_user(user)
            return redirect(url_for("index"))
        else:
            error_message = "Not an authorized user ({})".format(user_data["email"])
            return render_template("/sign_in.html", error_message=error_message)
    elif "authorize" in request.args:
        return redirect(oauth_service.get_authorize_url(
            scope="email",
            response_type="code",
            prompt="select_account",
            redirect_uri=signin_url))
    else:
        return render_template("/sign_in.html")
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def get_next_url(self):
        """Returns the URL where we want to redirect to.  This will
        always return a valid URL.
        """
        return (
            self.check_safe_root(request.values.get('next')) or
            self.check_safe_root(request.referrer) or
            (self.fallback_endpoint and
             self.check_safe_root(url_for(self.fallback_endpoint))) or
            request.url_root
        )
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def check_safe_root(self, url):
        if url is None:
            return None
        if self.safe_roots is None:
            return url
        if url.startswith(request.url_root) or url.startswith('/'):
            # A URL inside the same app is deemed to always be safe
            return url
        for safe_root in self.safe_roots:
            if url.startswith(safe_root):
                return url
        return None
项目:fame    作者:certsocietegenerale    | 项目源码 | 文件源码
def csrf_protect():
    if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
        referer = request.headers.get('Referer')

        if referer is None or different_origin(referer, request.url_root):
            raise Forbidden(description="Referer check failed.")
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def required(self, price, **kwargs):
        """API route decorator to request payment for a resource.

        This function stores the resource price in a closure. It will verify
        the validity of a payment, and allow access to the resource if the
        payment is successfully accepted.
        """
        def decorator(fn):
            """Validates payment and returns the original API route."""
            @wraps(fn)
            def _fn(*fn_args, **fn_kwargs):
                # Calculate resource cost
                nonlocal price
                _price = price(request, *fn_args, **fn_kwargs) if callable(price) else price

                # Need better way to pass server url to payment methods (FIXME)
                if 'server_url' not in kwargs:
                    url = urlparse(request.url_root)
                    kwargs.update({'server_url': url.scheme + '://' + url.netloc})

                # Continue to the API view if payment is valid or price is 0
                if _price == 0:
                    return fn(*fn_args, **fn_kwargs)
                try:
                    contains_payment = self.contains_payment(_price, request.headers, **kwargs)
                except BadRequest as e:
                    return Response(e.description, BAD_REQUEST)
                if contains_payment:
                    return fn(*fn_args, **fn_kwargs)
                else:
                    # Get headers for initial 402 response
                    payment_headers = {}
                    for method in self.allowed_methods:
                        payment_headers.update(method.get_402_headers(_price, **kwargs))
                    # Accessing the .files attribute of a request
                    # drains the input stream.
                    request.files
                    raise PaymentRequiredException(payment_headers)
            return _fn
        return decorator
项目:duckpond    作者:alexmilowski    | 项目源码 | 文件源码
def siteURL(config,request):
   u = current_app.config.get('SITE_URL')
   return u if u is not None else request.url_root[0:-1]
项目:duckpond    作者:alexmilowski    | 项目源码 | 文件源码
def page_not_found(error):
   return render_template_string(generate_template(current_app.config,'error.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], path=request.path, entry=None, error="I'm sorry.  I can't find that page.")
项目:duckpond    作者:alexmilowski    | 项目源码 | 文件源码
def send_doc(path):
   siteURL = current_app.config.get('SITE_URL')

   location = current_app.config.get('DOCS')
   if location is None:
      abort(404)

   if location[0:4]=='http':
      url = location + path
      req = requests.get(url, stream = True,headers={'Connection' : 'close'})
      if req.headers['Content-Type'][0:9]=='text/html':
         return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=req.text, entry=None)
      else:
         return Response(stream_with_context(req.iter_content()), headers = dict(req.headers))

   else:

      dir = os.path.abspath(location)
      if path.endswith('.html'):

         glob = StringIO()
         try:
            with open(os.path.join(dir,path), mode='r', encoding='utf-8') as doc:
               peeked = doc.readline()
               if peeked.startswith('<!DOCTYPE'):
                  return send_from_directory(dir, path)
               glob.write(peeked)
               for line in doc:
                  glob.write(line)

               return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=glob.getvalue(), entry=None)
         except FileNotFoundError:
            abort(404)

      return send_from_directory(dir, path)
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def ipxe_boot(node):
    response = config_renderer.ipxe.render(node, request.url_root)
    return Response(response, mimetype='text/plain')
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def report(node):
    if not node.maintenance_mode:
        try:
            node.active_config_version = int(request.args.get('version'))
        except (ValueError, TypeError):
            return abort(400)

        if request.content_type != 'application/json':
            return abort(400)

        provision = models.Provision()
        provision.node = node
        provision.config_version = node.active_config_version
        provision.ignition_config = request.data
        if node.target_config_version == node.active_config_version:
            provision.ipxe_config = config_renderer.ipxe.render(node, request.url_root)
        models.db.session.add(provision)

    models.db.session.add(node)

    node.disks.update({
        models.Disk.wipe_next_boot: False
    })

    if node.cluster.are_etcd_nodes_configured:
        node.cluster.assert_etcd_cluster_exists = True
        models.db.session.add(node.cluster)

    models.db.session.commit()
    return Response('ok', mimetype='application/json')
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def get_content(self):
        packages = [P(self.node, request.url_root) for P in self.get_package_classes()]
        files = list(itertools.chain.from_iterable(p.get_files() for p in packages))
        units = list(itertools.chain.from_iterable(p.get_units() for p in packages))
        networkd_units = list(itertools.chain.from_iterable(p.get_networkd_units() for p in packages))
        ssh_keys = self.get_ssh_keys()

        return {
            'ignition': {
                'version': '2.0.0',
                'config': {},
            },
            'storage': self.get_storage_config(files),
            'networkd': {
                'units': networkd_units
            },
            'passwd': {
                'users': [{
                    'name': 'root',
                    'sshAuthorizedKeys': ssh_keys,
                }, {
                    'name': 'core',
                    'sshAuthorizedKeys': ssh_keys,
                }],
            },
            'systemd': {
                'units': units
            },
        }
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def target_ipxe_config_view(self):
        node = self.get_one(request.args.get('id'))
        response = config_renderer.ipxe.render(node, request.url_root)
        return Response(response, mimetype='text/plain')
项目:cozmo-python-sdk    作者:anki    | 项目源码 | 文件源码
def handle_cozmoImage():
    if is_microsoft_browser(request):
        return serve_single_image()
    return flask_helpers.stream_video(streaming_video, request.url_root)
项目:SwarmOps    作者:staugur    | 项目源码 | 文件源码
def login():
    if g.auth:
        return redirect(url_for("index"))
    else:
        query = {"sso": True,
           "sso_r": SpliceURL.Modify(request.url_root, "/sso/").geturl,
           "sso_p": SSO["SSO.PROJECT"],
           "sso_t": md5("%s:%s" %(SSO["SSO.PROJECT"], SpliceURL.Modify(request.url_root, "/sso/").geturl))
        }
        SSOLoginURL = SpliceURL.Modify(url=SSO["SSO.URL"], path="/login/", query=query).geturl
        logger.info("User request login to SSO: %s" %SSOLoginURL)
        return redirect(SSOLoginURL)
项目:SwarmOps    作者:staugur    | 项目源码 | 文件源码
def logout():
    SSOLogoutURL = SSO.get("SSO.URL") + "/sso/?nextUrl=" + request.url_root.strip("/")
    resp = make_response(redirect(SSOLogoutURL))
    resp.set_cookie(key='logged_in', value='', expires=0)
    resp.set_cookie(key='username',  value='', expires=0)
    resp.set_cookie(key='sessionId',  value='', expires=0)
    resp.set_cookie(key='time',  value='', expires=0)
    resp.set_cookie(key='Azone',  value='', expires=0)
    return resp
项目:spike    作者:nbs-system    | 项目源码 | 文件源码
def atom():
    feed = AtomFeed(title='Recent rules', feed_url=request.url, url=request.url_root, author='Spike',
                    icon=url_for('static', filename='favicon.ico'))
    _rules = NaxsiRules.query.order_by(NaxsiRules.sid.desc()).limit(15).all()
    if _rules:
        for rule in _rules:
            feed.add(rule.msg, str(rule), updated=datetime.fromtimestamp(rule.timestamp), id=rule.sid)
    return feed.get_response()
项目:QuestradeAPI_PythonWrapper    作者:pcinat    | 项目源码 | 文件源码
def authorize():
    questradeAPI = OAuth2Session(client_id, redirect_uri=__get_redirect_uri__(request.url_root))
    user_authorization_url, state = questradeAPI.authorization_url(authorization_url)

    session['oauth_state'] = state
    return redirect(user_authorization_url)
项目:QuestradeAPI_PythonWrapper    作者:pcinat    | 项目源码 | 文件源码
def callback():
    questradeAPI = OAuth2Session(client_id, redirect_uri=__get_redirect_uri__(request.url_root), state=session['oauth_state'])
    token = questradeAPI.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url)

    __set_session_token__(token)

    return redirect(url_for('.token'))
项目:Canella-CMS    作者:mush42    | 项目源码 | 文件源码
def make_blog_feed(order=None, limit=15):
    feed = AtomFeed(
        title="{} - Blog Feed".format(settings.title),
        subtitle=settings.tagline,
        feed_url=request.url,
        url=request.url_root,
        author="Musharraf Omer",
        icon=None,
        logo=None,
        rights="Copyright 2000-2016 - Mushy.ltd",
    )
    order = order or Post.publish_date.desc()
    items = Post.query.order_by(order).limit(limit).all()
    for item in items:
        item.url = url_for('canella-blog.post', slug=item.slug)
        feed.add(
            title=item.title,
            url=item.url,
            content=make_summary(item.body),
            content_type='html',
            summary=item.meta_description,
            updated=item.updated or item.created,
            author="Musharraf Omer",
            published=item.publish_date,
            categories=[{'term': t.slug, 'label': t.title} for t in item.tags]
        )
    return feed
项目:vulpy    作者:portantier    | 项目源码 | 文件源码
def delhistory():

    if not str(request.referrer).startswith(request.url_root):
        #app.logger.info("referer:", str(request.referrer))
        return "CSRF ATTEMPT!"

    os.unlink('ua-history.txt')
    return "ok"
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def _preemptive_unless(base_url=None, additional_unless=None):
    if base_url is None:
        base_url = request.url_root

    disabled_for_root = not settings().getBoolean(["devel", "cache", "preemptive"]) \
                        or base_url in settings().get(["server", "preemptiveCache", "exceptions"]) \
                        or not (base_url.startswith("http://") or base_url.startswith("https://"))

    recording_disabled = request.headers.get("X-Preemptive-Record", "yes") == "no"

    if callable(additional_unless):
        return recording_disabled or disabled_for_root or additional_unless()
    else:
        return recording_disabled or disabled_for_root
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def _preemptive_data(key, path=None, base_url=None, data=None, additional_request_data=None):
    if path is None:
        path = request.path
    if base_url is None:
        base_url = request.url_root

    d = dict(path=path,
             base_url=base_url,
             query_string="l10n={}".format(g.locale.language if g.locale else "en"))

    if key != "_default":
        d["plugin"] = key

    # add data if we have any
    if data is not None:
        try:
            if callable(data):
                data = data()
            if data:
                if "query_string" in data:
                    data["query_string"] = "l10n={}&{}".format(g.locale.language, data["query_string"])
                d.update(data)
        except:
            _logger.exception("Error collecting data for preemptive cache from plugin {}".format(key))

    # add additional request data if we have any
    if callable(additional_request_data):
        try:
            ard = additional_request_data()
            if ard:
                d.update(dict(
                    _additional_request_data=ard
                ))
        except:
            _logger.exception("Error retrieving additional data for preemptive cache from plugin {}".format(key))

    return d
项目:rsvpapp    作者:cloudyuga    | 项目源码 | 文件源码
def dict(self):
        _id = str(self._id)
        return {
            "_id": _id,
            "name": self.name,
            "email": self.email,
            "links": {
                "self": "{}api/rsvps/{}".format(request.url_root, _id)
            }
        }
项目:bicingbot    作者:marivipelaez    | 项目源码 | 文件源码
def set_webhook():
    """
    Sets the BicingBot webhook in its Telegram Bot

    :return: HTTP_RESPONSE with 200 OK status and a status message.
    """
    response = 'Webhook configured'
    if request.url_root.startswith('https'):
        bot_response = get_bot().setWebhook('{}/bicingbot'.format(request.url_root))
        logger.debug(bot_response)
    else:
        response = 'Bad webhook: https url must be provided for webhook'
        logger.warn(response)
    return response
项目:flask_skeleton    作者:Bleezworld    | 项目源码 | 文件源码
def ImageUrlToFile(image_url):
    """images are stored as ad-213213.png in the db. We get them from the website as
    /static/upload-debug/ad-213213.png so we trim them.
    Returns True, filepath"""
    BAD_RETURN = False, ''
    prefix = request.url_root + "static/%s/" % app.config[Constants.KEY_UPLOAD_DIR]
    if not image_url.startswith(prefix):
        return BAD_RETURN
    a = image_url.split('/')
    if not a or (image_url != prefix + a[-1]):
        return BAD_RETURN
    return True, a[-1]
项目:flask_skeleton    作者:Bleezworld    | 项目源码 | 文件源码
def FileToImageUrl(image_file):
    return request.url_root + "static/%s/%s" % (app.config[Constants.KEY_UPLOAD_DIR], image_file)
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def feeds_blogs():
    """Global feed generator for latest blogposts across all projects"""
    @current_app.cache.cached(60*5)
    def render_page():
        feed = AtomFeed('Blender Cloud - Latest updates',
                        feed_url=request.url, url=request.url_root)
        # Get latest blog posts
        api = system_util.pillar_api()
        latest_posts = Node.all({
            'where': {'node_type': 'post', 'properties.status': 'published'},
            'embedded': {'user': 1},
            'sort': '-_created',
            'max_results': '15'
            }, api=api)

        # Populate the feed
        for post in latest_posts._items:
            author = post.user.fullname
            updated = post._updated if post._updated else post._created
            url = url_for_node(node=post)
            content = post.properties.content[:500]
            content = '<p>{0}... <a href="{1}">Read more</a></p>'.format(content, url)
            feed.add(post.name, str(content),
                     content_type='html',
                     author=author,
                     url=url,
                     updated=updated,
                     published=post._created)
        return feed.get_response()
    return render_page()
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def get_next_url(self):
        """Returns the URL where we want to redirect to.  This will
        always return a valid URL.
        """
        return (
            self.check_safe_root(request.values.get('next')) or
            self.check_safe_root(request.referrer) or
            (self.fallback_endpoint and
             self.check_safe_root(url_for(self.fallback_endpoint))) or
            request.url_root
        )
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def check_safe_root(self, url):
        if url is None:
            return None
        if self.safe_roots is None:
            return url
        if url.startswith(request.url_root) or url.startswith('/'):
            # A URL inside the same app is deemed to always be safe
            return url
        for safe_root in self.safe_roots:
            if url.startswith(safe_root):
                return url
        return None
项目:appscale-cloud-storage    作者:AppScale    | 项目源码 | 文件源码
def insert_bucket(project, conn):
    """ Creates a new bucket.

    Args:
        project: A string specifying a project ID.
        conn: An S3Connection instance.
    Returns:
        A JSON string representing a bucket.
    """
    bucket_info = request.get_json()
    # TODO: Do the following lookup and create under a lock.
    if conn.lookup(bucket_info['name']) is not None:
        return error('Sorry, that name is not available. '
                     'Please try a different one.', HTTP_CONFLICT)

    index_bucket(bucket_info['name'], project)

    conn.create_bucket(bucket_info['name'])

    # The HEAD bucket request does not return creation_date. This is an
    # inefficient way of retrieving it.
    try:
        bucket = next(bucket for bucket in conn.get_all_buckets()
                      if bucket.name == bucket_info['name'])
    except StopIteration:
        return error('Unable to find bucket after creating it.')

    bucket_url = url_for('get_bucket', bucket_name=bucket.name)
    response = {
        'kind': 'storage#bucket',
        'id': bucket.name,
        'selfLink': request.url_root[:-1] + bucket_url,
        'name': bucket.name,
        'timeCreated': bucket.creation_date,
        'updated': bucket.creation_date
    }
    return Response(json.dumps(response), mimetype='application/json')
项目:appscale-cloud-storage    作者:AppScale    | 项目源码 | 文件源码
def get_bucket(bucket_name, conn):
    """ Returns metadata for the specified bucket.

    Args:
        bucket_name: A string specifying a bucket name.
        conn: An S3Connection instance.
    Returns:
        A JSON string representing a bucket.
    """
    projection = request.args.get('projection') or 'noAcl'
    if projection != 'noAcl':
        return error('projection: {} not supported.'.format(projection),
                     HTTP_NOT_IMPLEMENTED)

    try:
        bucket = next(bucket for bucket in conn.get_all_buckets()
                      if bucket.name == bucket_name)
    except StopIteration:
        return error('Not Found', HTTP_NOT_FOUND)

    bucket_url = url_for('get_bucket', bucket_name=bucket.name)
    response = {
        'kind': 'storage#bucket',
        'id': bucket.name,
        'selfLink': request.url_root[:-1] + bucket_url,
        'name': bucket.name,
        'timeCreated': bucket.creation_date,
        'updated': bucket.creation_date
    }
    return Response(json.dumps(response), mimetype='application/json')
项目:volback    作者:jamrizzi    | 项目源码 | 文件源码
def get(self, provider):
        if provider == 'github':
            return github.authorize(callback=request.url_root + 'api/v1/auth/callback/github')
        raise ProviderInvalid(provider)
项目:appr    作者:app-registry    | 项目源码 | 文件源码
def _url_for(self, path):
        return request.url_root + self.api_prefix + path
项目:appr    作者:app-registry    | 项目源码 | 文件源码
def index_discovery():
    host = request.url_root
    domain = request.headers['Host']
    return """<html lang="en">
    <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta name="appr-package" content="{domain}/{{name}} {host}/appr/api/v1/packages/{{name}}/pull">
    </head>
    <body>
    </body>
    </html>""".format(domain=domain, host=host)