Python flask.request 模块,url() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.url()

项目:oadoi    作者:Impactstory    | 项目源码 | 文件源码
def bookmarklet_js():
    base_url = request.url.replace(
        "browser-tools/bookmarklet.js",
        "static/browser-tools/"
    )

    if "localhost:" not in base_url:
        # seems like this shouldn't be necessary. but i think
        # flask's request.url is coming in with http even when
        # we asked for https on the server. weird.
        base_url = base_url.replace("http://", "https://")

    rendered = render_template(
        "browser-tools/bookmarklet.js",
        base_url=base_url
    )
    resp = make_response(rendered, 200)
    resp.mimetype = "application/javascript"
    return resp
项目:ipwb    作者:oduwsdl    | 项目源码 | 文件源码
def showTimeMap(urir, format):
    urir = getCompleteURI(urir)
    s = surt.surt(urir, path_strip_trailing_slash_unless_empty=False)
    indexPath = ipwbConfig.getIPWBReplayIndexPath()

    cdxjLinesWithURIR = getCDXJLinesWithURIR(urir, indexPath)
    tmContentType = ''
    if format == 'link':
        tm = generateLinkTimeMapFromCDXJLines(
            cdxjLinesWithURIR, s, request.url)
        tmContentType = 'application/link-format'
    elif format == 'cdxj':
        tm = generateCDXJTimeMapFromCDXJLines(
            cdxjLinesWithURIR, s, request.url)
        tmContentType = 'application/cdxj+ors'

    resp = Response(tm)
    resp.headers['Content-Type'] = tmContentType

    return resp
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def update_content_in_local_cache(url, content, method='GET'):
    """?? local_cache ??????, ??content
    ?stream?????"""
    if local_cache_enable and method == 'GET' and cache.is_cached(url):
        info_dict = cache.get_info(url)
        resp = cache.get_obj(url)
        resp.set_data(content)

        # ???????????content?, without_content ????true
        # ?????????, ???content????, ????????
        # ?stream???, ??????http?, ???????, ????????????????
        # ?????????????????????, ???????????????
        info_dict['without_content'] = False

        if verbose_level >= 4: dbgprint('LocalCache_UpdateCache', url, content[:30], len(content))
        cache.put_obj(
            url,
            resp,
            obj_size=len(content),
            expires=get_expire_from_mime(parse.mime),
            last_modified=info_dict.get('last_modified'),
            info_dict=info_dict,
        )
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def extract_url_path_and_query(full_url=None, no_query=False):
    """
    Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y

    :param no_query:
    :type full_url: str
    :param full_url: full url
    :return: str
    """
    if full_url is None:
        full_url = request.url
    split = urlsplit(full_url)
    result = split.path or "/"
    if not no_query and split.query:
        result += '?' + split.query
    return result


# ################# End Client Request Handler #################


# ################# Begin Middle Functions #################
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def request_remote_site():
    """
    ???????(high-level), ????404/500??? domain_guess ??
    """

    # ????????
    # ??: ?zmirror?????????, ??????????????
    parse.remote_response = send_request(
        parse.remote_url,
        method=request.method,
        headers=parse.client_header,
        data=parse.request_data_encoded,
    )

    if parse.remote_response.url != parse.remote_url:
        warnprint("requests's remote url", parse.remote_response.url,
                  'does no equals our rewrited url', parse.remote_url)

    if 400 <= parse.remote_response.status_code <= 599:
        # ??url????????
        dbgprint("Domain guessing for", request.url)
        result = guess_correct_domain()
        if result is not None:
            parse.remote_response = result
项目:arch-security-tracker    作者:archlinux    | 项目源码 | 文件源码
def advisory_atom():
    last_recent_entries = 15
    data = get_advisory_data()['published'][:last_recent_entries]
    feed = AtomFeed('Arch Linux Security - Recent advisories',
                    feed_url=request.url, url=request.url_root)

    for entry in data:
        advisory = entry['advisory']
        package = entry['package']
        title = '[{}] {}: {}'.format(advisory.id, package.pkgname, advisory.advisory_type)

        feed.add(title=title,
                 content=render_template('feed.html', content=advisory.content),
                 content_type='html',
                 summary=render_template('feed.html', content=advisory.impact),
                 summary_tpe='html',
                 author='Arch Linux Security Team',
                 url=TRACKER_ISSUE_URL.format(advisory.id),
                 published=advisory.created,
                 updated=advisory.created)
    return feed.get_response()
项目:pokedex-as-it-should-be    作者:leotok    | 项目源码 | 文件源码
def predict():
    import ipdb; ipdb.set_trace(context=20)
    if 'file' not in request.files:
        flash('No file part')
        return redirect(request.url)
    file = request.files['file']

    if file.filename == '':
        flash('No selected file')
        return redirect(request.url)

    if file and allowed_file(file.filename):
        filename = secure_filename(file.filename)
        try:
            pokemon_name = predict_mlp(file).capitalize()
            pokemon_desc = pokemon_entries.get(pokemon_name)
            msg = ""
        except Exception as e:
            pokemon_name = None
            pokemon_desc = None
            msg = str(e)

    return jsonify({'name': pokemon_name, 'description': pokemon_desc, "msg": msg})
项目:presidency    作者:jayrav13    | 项目源码 | 文件源码
def rss_feed():

    feed = AtomFeed('White House Briefing Room Releases', feed_url=request.url, url=request.url_root)

    documents = WhiteHouse.query.order_by(WhiteHouse.document_date.desc())

    for document in documents:

        feed.add(document.title, document.tweet,
            content_type='text',
            author="@presproject2017",
            url=make_external(document.full_url),
            updated=document.document_date,
            published=document.document_date)

    return feed.get_response()
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def retrieveAlertsCountWithType():
    """ Retrieve number of alerts in timeframe (GET-Parameter time as decimal or "day") and divide into honypot types"""

    # get result from cache
    getCacheResult = getCache(request.url, "url")
    if getCacheResult is not False:
        return jsonify(getCacheResult)

    # query ES
    else:
        # Retrieve Number of Alerts from ElasticSearch and return as xml / json
        if not request.args.get('time'):
            app.logger.error('No time GET-parameter supplied in retrieveAlertsCountWithType. Must be decimal number (in minutes) or string "day"')
            return app.config['DEFAULTRESPONSE']
        else:
            returnResult = formatAlertsCountWithType(queryAlertsCountWithType(request.args.get('time'), checkCommunityIndex(request)))
            setCache(request.url, returnResult, 13, "url")
            app.logger.debug('UNCACHED %s' % str(request.url))
            return jsonify(returnResult)
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def retrieveDatasetAlertsPerMonth():
    """ Retrieve the attacks / day in the last x days from elasticsearch
        and return as JSON for the last months, defaults to last month,
        if no GET parameter days is given
    """

    # get result from cache
    getCacheResult = getCache(request.url, "url")
    if getCacheResult is not False:
        return jsonify(getCacheResult)

    # query ES
    else:
        if not request.args.get('days'):
            # Using default : within the last month
            returnResult = formatDatasetAlertsPerMonth(queryDatasetAlertsPerMonth(None, checkCommunityIndex(request)))
        else:
            returnResult = formatDatasetAlertsPerMonth(queryDatasetAlertsPerMonth(request.args.get('days'), checkCommunityIndex(request)))
        setCache(request.url, returnResult, 600, "url")
        return jsonify(returnResult)
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def retrieveDatasetAlertTypesPerMonth():
    """ Retrieve the attacks / day in the last x days from elasticsearch,
        split by attack group
        and return as JSON for the last x months, defaults to last month,
        if no GET parameter days is given
    """

    # get result from cache
    getCacheResult = getCache(request.url, "url")
    if getCacheResult is not False:
        return jsonify(getCacheResult)

    # query ES
    else:
        if not request.args.get('days'):
            # Using default : within the last month
            returnResult = formatDatasetAlertTypesPerMonth(queryDatasetAlertTypesPerMonth(None, checkCommunityIndex(request)))
        else:
            returnResult = formatDatasetAlertTypesPerMonth(queryDatasetAlertTypesPerMonth(request.args.get('days'), checkCommunityIndex(request)))
        setCache(request.url, returnResult, 3600, "url")
        return jsonify(returnResult)
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def retrieveAlertStats():
    """ Retrieve combined statistics
        AlertsLastMinute, AlertsLastHour,  AlertsLast24Hours
    """

    # get result from cache
    getCacheResult = getCache(request.url, "url")
    if getCacheResult is not False:
        return jsonify(getCacheResult)

    # query ES
    else:
        returnResult = formatAlertStats(queryAlertStats(checkCommunityIndex(request)))
        setCache(request.url, returnResult, 13, "url")
        app.logger.debug('UNCACHED %s' % str(request.url))
        return jsonify(returnResult)
项目:pyldn    作者:albertmeronyo    | 项目源码 | 文件源码
def get_inbox():
    pyldnlog.debug("Requested inbox data of {} in {}".format(request.url, request.headers['Accept']))
    if not request.headers['Accept'] or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
        resp = make_response(inbox_graph.serialize(format='application/ld+json'))
        resp.headers['Content-Type'] = 'application/ld+json'
    elif request.headers['Accept'] in ACCEPTED_TYPES:
        resp = make_response(inbox_graph.serialize(format=request.headers['Accept']))
        resp.headers['Content-Type'] = request.headers['Accept']
    else:
        return 'Requested format unavailable', 415

    resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
    resp.headers['Allow'] = "GET, HEAD, OPTIONS, POST"
    resp.headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type", <http://www.w3.org/ns/ldp#RDFSource>; rel="type", <http://www.w3.org/ns/ldp#Container>; rel="type", <http://www.w3.org/ns/ldp#BasicContainer>; rel="type"'
    resp.headers['Accept-Post'] = 'application/ld+json, text/turtle'

    return resp
项目:pyldn    作者:albertmeronyo    | 项目源码 | 文件源码
def get_notification(id):
    pyldnlog.debug("Requested notification data of {}".format(request.url))
    pyldnlog.debug("Headers: {}".format(request.headers))

    # Check if the named graph exists
    pyldnlog.debug("Dict key is {}".format(pyldnconf._inbox_url + id))
    if pyldnconf._inbox_url + id not in graphs:
        return 'Requested notification does not exist', 404

    if 'Accept' not in request.headers or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']:
        resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format='application/ld+json'))
        resp.headers['Content-Type'] = 'application/ld+json'
    elif request.headers['Accept'] in ACCEPTED_TYPES:
        resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format=request.headers['Accept']))
        resp.headers['Content-Type'] = request.headers['Accept']
    else:
        return 'Requested format unavailable', 415

    resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn'
    resp.headers['Allow'] = "GET"

    return resp
项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def register_extension(app):
    @app.before_request
    def add_correlation_id(*args, **kw):
        correlation_id = request.headers.get(CORRELATION_ID)
        log.debug("%s %s", request.method, request.url)
        if not correlation_id:
            correlation_id = str(uuid.uuid4())
            if request.method != "GET":
                """
                TODO: remove sensitive information such as username/password
                """
                log.debug({
                    "message": "Tracking request",
                    "correlation_id": correlation_id,
                    "method": request.method,
                    "uri": request.url,
                    "data": request.data,
                })
        request.correlation_id = correlation_id

    @app.after_request
    def save_correlation_id(response):
        if CORRELATION_ID not in response.headers:
            response.headers[CORRELATION_ID] = getattr(request, "correlation_id", None)
        return response
项目:flask_atlassian_connect    作者:halkeye    | 项目源码 | 文件源码
def _provide_client_handler(self, section, name, kwargs_updator=None):
        def _wrapper(func):
            @wraps(func)
            def _handler(**kwargs):
                client_key = self.auth.authenticate(
                    request.method,
                    request.url,
                    request.headers)
                client = self.client_class.load(client_key)
                if not client:
                    abort(401)
                g.ac_client = client
                kwargs['client'] = client
                if kwargs_updator:
                    kwargs.update(kwargs_updator(**kwargs))

                ret = func(**kwargs)
                if ret is not None:
                    return ret
                return '', 204
            self._add_handler(section, name, _handler)
            return func
        return _wrapper
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def error_mail(subject, data, r, via_web=True):
    body = '''
remote URL: {r.url}
status code: {r.status_code}

request data:
{data}

status code: {r.status_code}
content-type: {r.headers[content-type]}

reply:
{r.text}
'''.format(r=r, data=data)

    if not has_request_context():
        via_web = False

    if via_web:
        user = get_username()
        body = 'site URL: {}\nuser: {}\n'.format(request.url, user) + body

    send_mail(subject, body)
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def announce_change(change):
    place = change.place
    body = '''
user: {change.user.username}
name: {name}
page: {url}
items: {change.update_count}
comment: {change.comment}

https://www.openstreetmap.org/changeset/{change.id}

'''.format(name=place.display_name,
           url=place.candidates_url(_external=True),
           change=change)

    send_mail('tags added: {}'.format(place.name_for_changeset), body)
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def open_changeset_error(place, changeset, r):
    template = '''
user: {change.user.username}
name: {name}
page: {url}

sent:

{sent}

reply:

{reply}

'''
    body = template.format(name=place.display_name,
                           url=place.candidates_url(_external=True),
                           sent=changeset,
                           reply=r.text)

    send_mail('error creating changeset:' + place.name, body)
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def log_exception(self, exc_info):
        self.logger.error("""
Path:                 %s
HTTP Method:          %s
Client IP Address:    %s
User Agent:           %s
User Platform:        %s
User Browser:         %s
User Browser Version: %s
GET args:             %s
view args:            %s
URL:                  %s
""" % (
            request.path,
            request.method,
            request.remote_addr,
            request.user_agent.string,
            request.user_agent.platform,
            request.user_agent.browser,
            request.user_agent.version,
            dict(request.args),
            request.view_args,
            request.url
        ), exc_info=exc_info)
项目:globus-sample-data-portal    作者:globus    | 项目源码 | 文件源码
def authenticated(fn):
    """Mark a route as requiring authentication."""
    @wraps(fn)
    def decorated_function(*args, **kwargs):
        if not session.get('is_authenticated'):
            return redirect(url_for('login', next=request.url))

        if request.path == '/logout':
            return fn(*args, **kwargs)

        if (not session.get('name') or
                not session.get('email') or
                not session.get('institution')) and request.path != '/profile':
            return redirect(url_for('profile', next=request.url))

        return fn(*args, **kwargs)
    return decorated_function
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def login_required(func):
    """
    Decorator to require login and save URL for redirecting user after login
    """

    @wraps(func)
    def decorated_function(*args, **kwargs):
        """decorator args"""

        if not is_logged_in():
            # Save off the page so we can redirect them to what they were
            # trying to view after logging in.
            session['previously_requested_page'] = request.url

            return redirect(url_for('login'))

        return func(*args, **kwargs)

    return decorated_function
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def set_featured_title():
    """Form POST to update featured title"""

    title = request.form['title']
    stack = request.form['stack']

    article = models.search_for_article(title, stacks=[stack], status=PUBLISHED)
    if article is None:
        flash('Cannot find published guide "%s" stack "%s"' % (title, stack),
              category='error')

        url = session.pop('previously_requested_page', None)
        if url is None:
            url = url_for('index')

        return redirect(url)

    models.set_featured_article(article)
    flash('Featured guide updated', category='info')

    return redirect(url_for('index'))
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def get_social_redirect_url(article, share_domain):
    """
    Get social redirect url for po.st to enable all counts to follow us
    regardless of where we're hosted.
    """

    # Strip of trailing / to avoid having two slashes together in resulting url
    if share_domain.endswith('/'):
        share_domain = share_domain[:-1]

    redirect_url = filters.url_for_article(article)

    # Use full domain for redirect_url b/c this controls the po.st social
    # sharing numbers.  We want these numbers to stick with the domain
    # we're running on so counts go with us.
    url = url_for_domain(redirect_url, domain=share_domain)
    return strip_subfolder(url)
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def strip_subfolder(url):
    """
    Strip off the subfolder if it exists so we always use the exact same
    share url for saving counts.
    """

    subfolder = app.config.get('SUBFOLDER', None)
    if not subfolder:
        return url

    p = urlparse.urlparse(url)

    if not p.path.startswith(subfolder):
        return url

    new_path = p.path.replace('%s' % (subfolder), '', 1)
    new_url = urlparse.ParseResult(p.scheme, p.netloc, new_path, p.params,
                                   p.query, p.fragment)
    return new_url.geturl()
项目:modern-paste    作者:LINKIWI    | 项目源码 | 文件源码
def require_login_frontend(only_if=True):
    """
    Same logic as the API require_login, but this decorator is intended for use for frontend interfaces.
    It returns a redirect to the login page, along with a post-login redirect_url as a GET parameter.

    :param only_if: Optionally specify a boolean condition that needs to be true for the frontend login to be required.
                    This is semantically equivalent to "require login for this view endpoint only if <condition>,
                    otherwise, no login is required"
    """
    def decorator(func):
        @wraps(func)
        def decorated_view(*args, **kwargs):
            if not current_user.is_authenticated and only_if:
                return redirect(UserLoginInterfaceURI.uri(redirect_url=quote(request.url, safe='')))
            return func(*args, **kwargs)
        return decorated_view
    return decorator
项目:opsweb    作者:wylok    | 项目源码 | 文件源码
def curl():
    form = MyForm.MyForm_input()
    if form.submit.data:
        urls = form.text.data.strip().splitlines()
        urls = set(urls)
        for url in urls:
            Purge = purge.Purged()
            if not url or url.startswith('#'):
                continue
            else:
                url = url.strip()
            if not url.startswith('http'):
                flash('url begin with http(s)://')
                return render_template('Message_static.html',Main_Infos=g.main_infos)
            url_rep=Purge.purge_cdn(url)
            flash(url+' purge CDN '+url_rep)
        return render_template('Message_static.html',Main_Infos=g.main_infos)
    return render_template('cdn.html',form=form,Main_Infos=g.main_infos)
项目:opsweb    作者:wylok    | 项目源码 | 文件源码
def chart_center_traffic():
    try:
        Tra_cli_url_minute_datas = collections.OrderedDict()
        Tra_ser_url_minute_datas = collections.OrderedDict()
        for i in range(1,5):
            Tm = datetime.datetime.now() - datetime.timedelta(minutes=i)
            Tm = Tm.strftime('%H:%M')
            Tra_cli_url_minute_Key = 'traffic.cli.url_%s' % Tm
            Tra_ser_url_minute_Key = 'traffic.ser.url_%s' % Tm
            Tra_cli_url_minute_datas[Tm] = [[str(url), int(RC.zscore(Tra_cli_url_minute_Key, url)) * 8 / 1024 / 1024] for url in RC.zrevrange(Tra_cli_url_minute_Key, 0,4)]
            Tra_ser_url_minute_datas[Tm] = [[str(url), int(RC.zscore(Tra_ser_url_minute_Key,url)) * 8 / 1024 / 1024] for url in RC.zrevrange(Tra_ser_url_minute_Key, 0,4) ]
        return render_template('chart_center_traffic.html',Main_Infos=g.main_infos,Tra_cli_url_minute_datas=Tra_cli_url_minute_datas,Tra_ser_url_minute_datas=Tra_ser_url_minute_datas)
    except Exception as e:
        logging.error(e)
        flash('??????!')
        return render_template('Message_static.html', Main_Infos=g.main_infos)
项目:opsweb    作者:wylok    | 项目源码 | 文件源码
def gateway_domain():
    try:
        DATA = [eval(v) for v in RC.lrange('top_url_%s'%time.strftime('%Y-%m-%d',time.localtime()), 0, -1)]
        TOP_URL_DATA = [{'data': DATA, 'name': 'conn'}]
        values = collections.OrderedDict()
        for k in range(1,7):
            td = datetime.datetime.now()-datetime.timedelta(minutes=k)
            tt = td.strftime("%H:%M")
            tm = td.strftime('%Y%m%d%H%M')
            tables = ('????','????')
            httpry_Key = 'httpry_domain.%s' % tm
            values[tt] = [[url,int(RC.zscore(httpry_Key, url))] for url in RC.zrevrange(httpry_Key, 0, 10)]
        return render_template('gateway_domain.html',Main_Infos=g.main_infos,tables = tables,values=values,TOP_URL_DATA=TOP_URL_DATA)
    except Exception as e:
        logging.error(e)
        flash('??????!')
        return render_template('Message_static.html', Main_Infos=g.main_infos)
项目:opsweb    作者:wylok    | 项目源码 | 文件源码
def backup_mysql_results():
    produce.Async_log(g.user, request.url)
    try:
        if Redis.exists('finish_backup'):
            Infos = Redis.lrange('finish_backup',0,-1)
            if Infos:
                Infos = [eval(info) for info in set(Infos)]
                tt = time.strftime('%Y-%m-%d', time.localtime())
                tables = ('??','?????','MYSQL???','?????',' ??')
                return render_template('backup_mysql_results.html',Main_Infos=g.main_infos,Infos=Infos,tt=tt,tables=tables)
            else:
                raise flash('????:?????????!')
        else:
           raise flash('????:?????????!')
    except Exception as e:
        if 'old' not in str(e):
            flash(str(e))
        return render_template('Message_static.html',Main_Infos=g.main_infos)
项目:sdos-core    作者:sdos    | 项目源码 | 文件源码
def get_proxy_request_url(thisAuth, thisContainer=None, thisObject=None):
    """
    create the url under which this API proxy will reach its swift back end. basically this is the request url with a different hostname
    :param thisAuth:
    :param thisContainer:
    :param thisObject:
    :return:
    """
    u = configuration.swift_store_url.format(thisAuth)
    if thisContainer:
        u += "/" + thisContainer
        if thisObject:
            u += "/" + thisObject
    return u


##############################################################################
# Frontend Pool
##############################################################################
项目:sdos-core    作者:sdos    | 项目源码 | 文件源码
def handle_auth():
    """
    Forward the auth request to swift
    replace the given storage url with our own:
    'X-Storage-Url': 'http://192.168.209.204:8080/v1/AUTH_test'
    becomes
    'X-Storage-Url': 'http://localhost:4000/v1/AUTH_test'

    this is the first request any client makes; we passed on an auth-token from swift
    which is used in further requests
    :return:
    """
    clientHeaders = request.headers
    swiftStatus, swiftHeaders, swiftBody = httpBackend.doAuthGetToken(reqHead=clientHeaders, method="GET")
    log.debug("swift response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody))
    if 200 == swiftStatus:
        replaceStorageUrl(swiftResponse=swiftHeaders)
        log.debug("proxy response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody))
    return Response(status=swiftStatus, headers=swiftHeaders, response=swiftBody)
项目:chaos-monkey-engine    作者:BBVA    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        """Initialises a new ``Self`` link instance. Accepts the same
        Keyword Arguments as :class:`.Link`.

        Additional Keyword Args:
            external (bool): if true, force link to be fully-qualified URL, defaults to False

        See Also:
            :class:`.Link`
        """

        url = request.url
        external = kwargs.get('external', False)
        if not external and current_app.config['SERVER_NAME'] is None:
            url = request.url.replace(request.host_url, '/')

        return super(Self, self).__init__('self', url, **kwargs)
项目:dnflow    作者:DocNow    | 项目源码 | 文件源码
def feed():
    searches = query(
        '''
        SELECT * FROM searches 
        WHERE published IS NOT NULL 
        ORDER BY id DESC
        ''', json=True)
    site_url = 'http://' + app.config['HOSTNAME']
    feed_url = site_url + '/feed/'
    def add_url(s):
        s['url'] = site_url + '/summary/' + s['date_path'] + '/'
        return s
    searches = map(_date_format, searches)
    searches = list(map(add_url, searches))
    resp = make_response(
        render_template(
            'feed.xml', 
            updated=searches[0]['created'],
            site_url=site_url,
            feed_url=feed_url,
            searches=searches
        )
    )
    resp.headers['Content-Type'] = 'application/atom+xml'
    return resp
项目:postmarketos.org    作者:postmarketOS    | 项目源码 | 文件源码
def parse_post(post, external_links=False, create_html=True):
    with open(os.path.join(BLOG_CONTENT_DIR, post)) as handle:
        raw = handle.read()
    frontmatter, content = REGEX_SPLIT_FRONTMATTER.split(raw, 2)

    data = yaml.load(frontmatter)

    y, m, d, slug = post[:-3].split('-', maxsplit=3)

    if create_html:
        data['html'] = markdown.markdown(content, extensions=[
            'markdown.extensions.extra',
            'markdown.extensions.codehilite',
            'markdown.extensions.toc'
        ])

    data['url'] = url_for('blog_post', y=y, m=m, d=d, slug=slug,
                          _external=external_links)
    data['reading_time'] = reading_time(content)

    return data
项目:honeyd-python    作者:sookyp    | 项目源码 | 文件源码
def get_feed():
    from mhn.common.clio import Clio
    from mhn.auth import current_user
    authfeed = mhn.config['FEED_AUTH_REQUIRED']
    if authfeed and not current_user.is_authenticated():
        abort(404)
    feed = AtomFeed('MHN HpFeeds Report', feed_url=request.url,
                    url=request.url_root)
    sessions = Clio().session.get(options={'limit': 1000})
    for s in sessions:
        feedtext = u'Sensor "{identifier}" '
        feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.'
        feedtext = feedtext.format(**s.to_dict())
        feed.add('Feed', feedtext, content_type='text',
                 published=s.timestamp, updated=s.timestamp,
                 url=makeurl(url_for('api.get_session', session_id=str(s._id))))
    return feed
项目:GraphDash    作者:AmadeusITGroup    | 项目源码 | 文件源码
def after_request_log(response):
    name = dns_resolve(request.remote_addr)
    current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status}
    Request:   {method} {path}
    Version:   {http}
    Status:    {status}
    Url:       {url}
    IP:        {ip}
    Hostname:  {host}
    Agent:     {agent_platform} | {agent_browser} | {agent_browser_version}
    Raw Agent: {agent}
    """.format(method=request.method,
               path=request.path,
               url=request.url,
               ip=request.remote_addr,
               host=name if name is not None else '?',
               agent_platform=request.user_agent.platform,
               agent_browser=request.user_agent.browser,
               agent_browser_version=request.user_agent.version,
               agent=request.user_agent.string,
               http=request.environ.get('SERVER_PROTOCOL'),
               status=response.status))

    return response
项目:heutagogy-backend    作者:heutagogy    | 项目源码 | 文件源码
def get(self):
        url = request.args.get('url')
        tags = request.args.getlist('tag')

        filters = [db.Bookmark.user == current_user.id]
        if url is not None:
            filters.append(db.Bookmark.url == urldefrag(url).url)
        filters.append(db.Bookmark.tags.contains(tags))
        result = db.Bookmark.query.filter(*filters) \
                                  .order_by(
                                      db.Bookmark.read.desc().nullsfirst(),
                                      db.Bookmark.timestamp.desc()) \
                                  .paginate()
        headers = {}
        links = []
        if result.has_next:
            last_url = update_query(request.url, {'page': result.pages})
            links.append(lh.Link(last_url, rel='last'))

        if links:
            headers['Link'] = lh.format_links(links)
        return list(map(lambda x: x.to_dict(), result.items)), 200, headers
项目:bibtex-browser    作者:frapac    | 项目源码 | 文件源码
def add_entry():
    """Add a new entry to the bibliography."""
    form = BiblioForm()
    if form.validate_on_submit():
        bib_entry = BiblioEntry(ID=form.ID.data,
                                ENTRYTYPE=form.typ.data,
                                authors=form.author.data,
                                title=form.title.data,
                                year=form.year.data,
                                school="",
                                publisher="",
                                keywords=form.keywords.data,
                                url=form.url.data,
                                journal=form.journal.data)

        db.session.add(bib_entry)
        user = current_user.name
        event = Event(author=user, article=form.ID.data,
                      event="ADD", time=time.time())
        db.session.add(event)
        db.session.commit()
        return redirect("/biblio")
    return redirect("/biblio")
项目:perseids-manifold    作者:RDACollectionsWG    | 项目源码 | 文件源码
def apidocs():
    url = urlparse(request.url)
    if ":" in url.netloc:
        host, port = url.netloc.split(":")
    else:
        host = url.netloc
        port = "80"
    base_path = url.path.replace('/apidocs','') if url.path != "/apidocs" else "/"
    schemes = [url.scheme]
    other_scheme = "https" if url.scheme is "http" else "http"
    try:
        if request.get(other_scheme+"://"+url.netloc+url.path.replace('/apidocs','')+"/scheme").status_code is 200:
            schemes += [other_scheme]
    except:
        pass
    r = make_response(swagger.json(schemes, host, port, base_path))
    r.mimetype = 'application/json'
    return r
    # return send_from_directory("www","swagger.json")
项目:studio    作者:studioml    | 项目源码 | 文件源码
def tensorboard(logdir):
    port = _tensorboard_dirs.get(logdir)
    if not port:

        sock = socket.socket(socket.AF_INET)
        sock.bind(('', 0))
        port = sock.getsockname()[1]
        sock.close()

        subprocess.Popen([
            'tensorboard',
            '--logdir=' + logdir,
            '--port=' + str(port)])
        time.sleep(5)  # wait for tensorboard to spin up
        _tensorboard_dirs[logdir] = port

    redirect_url = 'http://{}:{}'.format(
        six.moves.urllib.parse.urlparse(request.url).hostname,
        port)

    logger.debug('Redirecting to ' + redirect_url)
    return redirect(redirect_url)
项目:marvin    作者:sdss    | 项目源码 | 文件源码
def make_error_page(app, name, code, sentry=None, data=None, exception=None):
    ''' creates the error page dictionary for web errors '''
    shortname = name.lower().replace(' ', '_')
    error = {}
    error['title'] = 'Marvin | {0}'.format(name)
    error['page'] = request.url
    error['event_id'] = g.get('sentry_event_id', None)
    error['data'] = data
    error['name'] = name
    error['code'] = code
    error['message'] = exception.description if exception and hasattr(exception, 'description') else None
    if app.config['USE_SENTRY'] and sentry:
        error['public_dsn'] = sentry.client.get_public_dsn('https')
    app.logger.error('{0} Exception {1}'.format(name, error))
    return render_template('errors/{0}.html'.format(shortname), **error), code

# ----------------
# Error Handling
# ----------------
项目:ugc.aggregator    作者:Dreamcatcher-GIS    | 项目源码 | 文件源码
def weibo_nearbytimeline_wrapper():
    try:
        data = r.get(request.url)
        if data is None:
            data = weibo_service.get_all_weibo_nearby_async(
                request.args["lat"],
                request.args["lng"],
                int(request.args["starttime"]),
                int(request.args["endtime"]),
                int(request.args["range"])
            )
            data = json.dumps(weibo_service.nearby_weibo_statis_wrapper(data))
            r.set(request.url, data)
        return data
    except:
        traceback.print_exc()
        abort(404)
项目:ugc.aggregator    作者:Dreamcatcher-GIS    | 项目源码 | 文件源码
def get_user_flow_to_html():
    try:
        # data = None
        data = r.get(request.url)
        if data is None:
            if "ring_str" in request.args:
                ring_str = request.args["ring_str"]
            else:
                ring_str = None
            data = hotel_data_service.get_user_flow_to_html(
                    request.args["hotel_name"],
                    request.args["baseinfo_id"].encode("utf-8"),
                    int(request.args["page"]),
                    ring_str=ring_str
                )
            data = json.dumps(data)
            r.set(request.url, data)
        return data
    except:
        traceback.print_exc()
        abort(404)
项目:ugc.aggregator    作者:Dreamcatcher-GIS    | 项目源码 | 文件源码
def query_floorstate():
    try:
        result = r.get(request.url)
        #result = None
        if result is None:
            result = pms_service.query_floorstate(
                request.args['floornum'].encode('utf-8'),
                request.args['time'].encode('utf-8')
            )
            result = json.dumps(result,ensure_ascii=False)
            #print result
            r.set(request.url,result)
        return result
    except Exception,e:
        print e
        traceback.print_exc()
        abort(404)
项目:python-mautic    作者:divio    | 项目源码 | 文件源码
def callback():
    """ Step 3: Retrieving an access token.

    The user has been redirected back from the provider to your registered
    callback URL. With this redirection comes an authorization code included
    in the redirect URL. We will use that to obtain an access token.
    """

    mautic = OAuth2Session(client_id, redirect_uri=redirect_uri,
                           state=session['oauth_state'])
    token = mautic.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url)

    # We use the session as a simple DB for this example.
    session['oauth_token'] = token
    update_token_tempfile(token)  # store token in /tmp/mautic_creds.json

    return redirect(url_for('.menu'))
项目:flask-openapi    作者:remcohaszing    | 项目源码 | 文件源码
def paths(self):
        """
        :class:`dict`: The top level :swagger:`pathsObject`.

        """
        paths = {}
        for rule in self.app.url_map.iter_rules():
            if rule.endpoint == 'static':
                continue
            log.info('Processing %r', rule)
            url, parameters = parse_werkzeug_url(rule.rule)
            paths.setdefault(url, {})
            if parameters:
                paths[url]['parameters'] = parameters
            for method in rule.methods:
                if method in ('HEAD', 'OPTIONS'):
                    # XXX Do we want to process these?
                    continue
                paths[url][method.lower()] = self._process_rule(rule)
        return paths
项目:flask-openapi    作者:remcohaszing    | 项目源码 | 文件源码
def deprecated(self, fn):
        """
        Mark an operation as deprecated.

        This will be exposed through the OpenAPI operation object.
        Additionally a warning will be emitted when the API is used.
        This can be configured using the ``OPENAPI_WARN_DEPRECATED``
        configuration option. This must be one of ``warn`` or ``log``.

        See :swagger:`operationDeprecated`.

        """
        fn.deprecated = True

        @functools.wraps(fn)
        def call_deprecated(*args, **kwargs):
            method = self._config('warn_deprecated')
            log_args = request.method, request.url
            if method == 'warn':
                warnings.warn(_DEPRECATION_MESSAGE % log_args,
                              DeprecationWarning)
            else:
                log.warning(_DEPRECATION_MESSAGE, *log_args)
            return fn(*args, **kwargs)
        return call_deprecated
项目:YoCoolSpotify    作者:shashanksurya    | 项目源码 | 文件源码
def getCurrentSpotifyPlaylistList():
    ret_list = []
    access_token = getSpotifyToken()
    if access_token:
        sp = spy.Spotify(access_token)
        results = sp.current_user()
        uri = 'spotify:user:12120746446:playlist:6ZK4Tz0ZsZuJBYyDZqlbGt'
        username = uri.split(":")[2]
        playlist_id = uri.split(":")[4]
        results = sp.user_playlist(username, playlist_id)
        for i, t in enumerate(results['tracks']['items']):
            temp = {}
            temp['img_url'] = t['track']['album']['images'][2]['url']
            temp['name'] = t['track']['name']
            temp['artist'] = t['track']['artists'][0]['name']
            temp['album'] = t['track']['album']['name']
            duration = int(t['track']['duration_ms']) / (1000 * 60)
            temp['duration'] = "{:2.2f}".format(duration)
            ret_list.append(temp)

    return ret_list
项目:YoCoolSpotify    作者:shashanksurya    | 项目源码 | 文件源码
def getCurrentYoutubePlaylistList():
    #keep dicts of videos in ret_list
    ret_list = []

    youtube = initYoutube()
    playlistitems_list_request = youtube.playlistItems().list(
                playlistId=constants.YOUTUBE_PLAYLIST_ID,
                part="snippet,contentDetails",
            maxResults=30
            )

    playlistitems_list_response = playlistitems_list_request.execute()
    #print(playlistitems_list_response)
    for playlist_item in playlistitems_list_response['items']:
        video_dict = {}
        #print(playlist_item)
        video_dict['name'] = playlist_item['snippet']['title']
        video_dict['img_url'] = playlist_item['snippet']['thumbnails']['default']['url']
        video_dict['artist'] = "--"
        video_dict['album'] = "--"
        video_dict['duration'] = "--:--"
        ret_list.append(video_dict)
    return ret_list