Python flask.request 模块,full_path() 实例源码

我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用flask.request.full_path()

项目:Alexandria    作者:indrora    | 项目源码 | 文件源码
def needs_authentication():
    """
    Decorator: Attach this to a route and it will require that the session has been
    previously authenticated.
    """
    def auth_chk_wrapper(f):
        # This is our decoratorated function wrapper
        @wraps(f)
        def deco(*args, **kwargs):
            # If the session does not yet have an authentication 
            if not is_authenticated():
                return redirect(sign_auth_path(request.full_path))
            else:
                return f(*args, **kwargs)
        return deco
    return auth_chk_wrapper
项目:alexandria    作者:LibraryBox-Dev    | 项目源码 | 文件源码
def needs_authentication():
    """
    Decorator: Attach this to a route and it will require that the session has been
    previously authenticated.
    """
    def auth_chk_wrapper(f):
        # This is our decoratorated function wrapper
        @wraps(f)
        def deco(*args, **kwargs):
            # If the session does not yet have an authentication 
            if not is_authenticated():
                return redirect(sign_auth_path(request.full_path))
            else:
                return f(*args, **kwargs)
        return deco
    return auth_chk_wrapper
项目:enjoliver    作者:JulienBalestra    | 项目源码 | 文件源码
def metadata():
    """
    Metadata
    ---
    tags:
      - matchbox
    responses:
      200:
        description: Metadata of the current group/profile
        schema:
            type: string
    """
    matchbox_uri = application.config.get("MATCHBOX_URI")
    if matchbox_uri:
        matchbox_resp = requests.get("%s%s" % (matchbox_uri, request.full_path))
        resp = matchbox_resp.content
        matchbox_resp.close()
        return Response(resp, status=matchbox_resp.status_code, mimetype="text/plain")

    return Response("matchbox=%s" % matchbox_uri, status=403, mimetype="text/plain")
项目:MagicPress    作者:huang-zp    | 项目源码 | 文件源码
def cached(timeout=5 * 60, key='blog_view_%s'):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            cache_key = key % request.full_path
            if request.method == 'POST':
                value = f(*args, **kwargs)
                cache.set(cache_key, value, timeout=timeout)
                return value
            value = cache.get(cache_key)
            if value is None:
                value = f(*args, **kwargs)
                cache.set(cache_key, value, timeout=timeout)
            return value
        return decorated_function
    return decorator
项目:Dr0p1t-Framework    作者:Exploit-install    | 项目源码 | 文件源码
def after_request(response):
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s %s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, response.status) )
    return response
项目:Dr0p1t-Framework    作者:Exploit-install    | 项目源码 | 文件源码
def exceptions(e):
    tb = traceback.format_exc()
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, tb) )
    return abort(500)
项目:automatron    作者:madflojo    | 项目源码 | 文件源码
def before_request():
    ''' Pre-request hander '''
    logger.debug("Incoming Web Request: {0}".format(request.full_path))
    g.dbc = connect_db(app.config)
项目:BookCloud    作者:livro-aberto    | 项目源码 | 文件源码
def internal_server_error(e):
    message = repr(e)
    trace = traceback.format_exc()
    trace = string.split(trace, '\n')
    timestamp = (datetime.fromtimestamp(time.time())
                 .strftime('%Y-%m-%d %H:%M:%S'))
    if current_user.is_authenticated:
        user = current_user.username
    else:
        user = 'anonymous'
    gathered_data = ('message: {}\n\n\n'
                     'timestamp: {}\n'
                     'ip: {}\n'
                     'method: {}\n'
                     'request.scheme: {}\n'
                     'request.full_path: {}\n'
                     'user: {}\n\n\n'
                     'trace: {}'.format(message, timestamp,
                                        request.remote_addr, request.method,
                                        request.scheme, request.full_path,
                                        user, '\n'.join(trace)))
    # send email to admin
    if app.config['TESTING']:
        print(gathered_data)
    else:
        mail_message = gathered_data
        msg = Message('Error: ' + message[:40],
                      body=mail_message,
                      recipients=[app.config['ADMIN_MAIL']])
        mail.send(msg)
        flash(_('A message has been sent to the administrator'), 'info')
    bookcloud_before_request()
    return render_template('500.html', message=gathered_data), 500
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def save_timing(name, t0):
    timing = Timing(start=t0,
                    path=request.full_path,
                    name=name,
                    seconds=time() - t0)
    database.session.add(timing)
项目:restful-api    作者:TeamGhostBuster    | 项目源码 | 文件源码
def add_comment(user, article_id):
    """
    @api {post} /article/:id/comment Post comment to an article
    @apiName Post comment to an article
    @apiGroup Comment

    @apiUse AuthorizationTokenHeader

    @apiParam {String} id Article unique ID.
    @apiParam {String} comment The comment to be added
    @apiParam {Boolean} public The privacy setting for the comment, default is True
    @apiParamExample Request (Example)
        {
            "comment": "I hate you",
            "public": "false"
        }

    @apiUse UnauthorizedAccessError
    @apiUse ResourceDoesNotExist
    @apiUse BadRequest
    """
    app.logger.info('User {} Access {}'.format(user, request.full_path))

    # Get request body
    req = RequestUtil.get_request()
    comment = req.get('comment')
    public = req.get('public')

    # Add comment
    result = MongoUtil.add_comment(user, article_id, comment, public)

    # If error occurs
    if isinstance(result, str):
        return ResponseUtil.error_response(result)

    app.logger.info('User {} Create comment {}'.format(user, result.id))
    return jsonify(JsonUtil.serialize(result))
项目:Alexandria    作者:indrora    | 项目源码 | 文件源码
def upload(path=""):
    """

    Uploads a file to a place.
    """
    # Check that uploads are OK
    if not can_upload():
        return redirect(sign_auth_path(request.full_path))
    if request.method=='POST':
        # handle uploaded files
        if not 'file' in request.files:
            abort(400) # General UA error
        file = request.files["file"]
        # We default to using the name of the file provided,
        # but allow the filename to be changed via POST details.
        fname = file.filename
        if 'name' in request.form:
            fname = request.form['name']
        safe_fname = secure_filename(fname)
        if not allowed_filename(safe_fname):
            abort(400) # General client error
        # We're handling a potentially dangerous path, better run it through
        # The flask path jointer.
        basepath=app.config.get("storage", "location")
        fullpath = safe_join(basepath, path)
        file.save(os.path.join(fullpath,fname))
        flash("File uploaded successfully")
        return redirect(url_for('browser.upload',path=path))
    else:
        return render_template("browser/upload.html",path=path,title="Upload Files")
项目:freshonions-torscraper    作者:dirtyfilthy    | 项目源码 | 文件源码
def __call__(self, f):
        @functools.wraps(f)
        def my_decorator(*args, **kwargs):
            global _is_cached
            _is_cached = True
            if _cache is None:
                return f(*args, **kwargs)
            response = _cache.get(request.full_path)
            if response is None:
                response = f(*args, **kwargs)
                _cache.set(request.path, response, self.timeout)
            _is_cached = False

            if self.render_layout:
                wrapped_response = make_response("%s%s%s" % (render_template("layout_header.html"), response, render_template("layout_footer.html")))
                if is_response(response):
                    wrapped_response.status_code = response.status_code
                    wrapped_response.headers     = response.headers
                    wrapped_response.status      = response.status
                    wrapped_response.mimetype    = response.mimetype
                return wrapped_response
            else:
                return response

        functools.update_wrapper(my_decorator, f)
        return my_decorator
项目:freshonions-torscraper    作者:dirtyfilthy    | 项目源码 | 文件源码
def setup_session():

    session.permanent = True
    app.permanent_session_lifetime = timedelta(days=365*30)
    if not 'uuid' in session:
        session['uuid'] = str(uuid.uuid4())
        g.uuid_is_fresh = True
    else:
        g.uuid_is_fresh = False
    now = datetime.now()

    referrer  = request.headers.get('Referer', '')
    path      = request.path
    full_path = request.full_path
    agent     = request.headers.get('User-Agent', '')

    if agent in BLACKLIST_AGENT or len(agent) < 15:
        g.request_log_id = 0
        return render_template('error.html',code=200,message="Layer 8 error. If you want my data, DON'T SCRAPE (too much cpu load), contact me and I will give it to you"), 200

    with db_session:
        req_log   = RequestLog( uuid=session['uuid'], 
                                uuid_is_fresh=g.uuid_is_fresh, 
                                created_at=now, 
                                agent=agent,
                                referrer=referrer,
                                path=path,
                                full_path=full_path)
        flush()
        g.request_log_id = req_log.id
项目:opennms_alarmforwarder    作者:NETHINKS    | 项目源码 | 文件源码
def login_required(func):
        """decorator for login required
        This decorator can be used for declaring a function
        which requires authentication
        """
        @wraps(func)
        def handle_login_required(*args, **kwargs):
            # try to get username from session
            try:
                username = session["username"]
                return func(*args, **kwargs)
            except:
                pass

            # allow HTTP basic auth
            try:
                user_basic = request.authorization.username
                password_basic = request.authorization.password
                authprovider = security.AuthenticationProvider.get_authprovider()
                if authprovider.authenticate(user_basic, password_basic):
                    return func(*args, **kwargs)
            except:
                pass

            # on error: redirect to login page or return HTTP/401
            if json_check():
                return json_error("Unauthenticated", 401)
            session["redirect"] = request.full_path
            return redirect("/login")
        return handle_login_required
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def before_request():
    logger.info('feedback request', url_path=request.full_path)

    metadata = get_metadata(current_user)
    if metadata:
        logger.bind(tx_id=metadata['tx_id'])
        g.schema_json = load_schema_from_metadata(metadata)
项目:eq-survey-runner    作者:ONSdigital    | 项目源码 | 文件源码
def before_request():
    metadata = get_metadata(current_user)
    if metadata:
        logger.bind(tx_id=metadata['tx_id'])

    values = request.view_args
    logger.bind(eq_id=values['eq_id'], form_type=values['form_type'],
                ce_id=values['collection_id'])
    logger.info('questionnaire request', method=request.method, url_path=request.full_path)

    if metadata:
        g.schema_json = load_schema_from_metadata(metadata)
        _check_same_survey(values['eq_id'], values['form_type'], values['collection_id'])
项目:alexandria    作者:LibraryBox-Dev    | 项目源码 | 文件源码
def upload(path=""):
    """

    Uploads a file to a place.
    """
    # Check that uploads are OK
    if not can_upload():
        return redirect(sign_auth_path(request.full_path))
    if request.method=='POST':
        # handle uploaded files
        if not 'file' in request.files:
            abort(400) # General UA error
        file = request.files["file"]
        # We default to using the name of the file provided,
        # but allow the filename to be changed via POST details.
        fname = file.filename
        if 'name' in request.form:
            fname = request.form['name']
        safe_fname = secure_filename(fname)
        if not allowed_filename(safe_fname):
            abort(400) # General client error
        # We're handling a potentially dangerous path, better run it through
        # The flask path jointer.
        basepath=app.config.get("storage", "location")
        fullpath = safe_join(basepath, path)
        file.save(os.path.join(fullpath,fname))
        flash("File uploaded successfully")
        return redirect(url_for('browser.upload',path=path))
    else:
        return render_template("browser/upload.html",path=path,title="Upload Files")
项目:mmwatch    作者:Zverik    | 项目源码 | 文件源码
def cached(timeout=1 * 60, key='view'):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            cache_key = '{}/{}'.format(key, request.full_path)
            rv = cache.get(cache_key)
            if rv is not None:
                return rv
            rv = f(*args, **kwargs)
            cache.set(cache_key, rv, timeout=timeout)
            return rv
        return decorated_function
    return decorator
项目:knowledge-repo    作者:airbnb    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
            if not current_app.config.get('INDEXING_ENABLED', True):
                return self._route(*args, **kwargs)

            log = PageView(
                page=request.full_path,
                endpoint=request.endpoint,
                user_id=current_user.id,
                ip_address=request.remote_addr,
                version=__version__
            )
            errorlog = None
            log.object_id, log.object_type, log.object_action, reextract_after_request = self.extract_objects(*args, **kwargs)
            db_session.add(log)  # Add log here to ensure pageviews are accurate

            try:
                return self._route(*args, **kwargs)
            except Exception as e:
                db_session.rollback()  # Ensure no lingering database changes remain after crashed route
                db_session.add(log)
                errorlog = ErrorLog.from_exception(e)
                db_session.add(errorlog)
                db_session.commit()
                raise_with_traceback(e)
            finally:
                # Extract object id and type after response generated (if requested) to ensure
                # most recent data is collected
                if reextract_after_request:
                    log.object_id, log.object_type, log.object_action, _ = self.extract_objects(*args, **kwargs)

                if errorlog is not None:
                    log.id_errorlog = errorlog.id
                db_session.add(log)
                db_session.commit()
项目:PyHub    作者:521xueweihan    | 项目源码 | 文件源码
def after_request(response):
    logger.info('%s %s %s %s %s', request.remote_addr, request.method,
                request.scheme, request.full_path, response.status)
    return response
项目:PyHub    作者:521xueweihan    | 项目源码 | 文件源码
def exceptions(e):
    tb = traceback.format_exc()
    logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
        request.remote_addr, request.method,
        request.scheme, request.full_path, tb)
    return e.status_code
项目:hellogithub.com    作者:521xueweihan    | 项目源码 | 文件源码
def after_request(response):
    logger.info('%s %s %s %s %s', request.method,
                request.environ.get('HTTP_X_REAL_IP', request.remote_addr),
                request.scheme, request.full_path, response.status)
    return response
项目:hellogithub.com    作者:521xueweihan    | 项目源码 | 文件源码
def exceptions(e):
    tb = traceback.format_exc()
    tb = tb.decode('utf-8')
    logger.error('%s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
                 request.environ.get('HTTP_X_REAL_IP', request.remote_addr),
                 request.method, request.scheme, request.full_path, tb)
    return '500 INTERNAL SERVER ERROR', 500
项目:extranet    作者:demaisj    | 项目源码 | 文件源码
def _before_request(self):
        if self._has_token_checker is None or self.token_view is None:
            return

        if request.endpoint != self.token_view and request.endpoint not in self.token_view_overrides and self._has_token_checker() is not None and not self._has_token_checker():
            return redirect(url_for(self.token_view, next=request.full_path))
项目:shorty    作者:PadamSethia    | 项目源码 | 文件源码
def after_request(response):
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    logger.error('%s %s %s %s %s %s',timestamp , request.remote_addr , \
                request.method , request.scheme , request.full_path , response.status)
    return response
项目:shorty    作者:PadamSethia    | 项目源码 | 文件源码
def exceptions(e):
    tb = traceback.format_exc()
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
        timestamp, request.remote_addr, request.method,
        request.scheme, request.full_path, tb)
    return make_response(e , 405)
项目:enjoliver    作者:JulienBalestra    | 项目源码 | 文件源码
def ipxe():
    """
    iPXE
    ---
    tags:
      - matchbox
    responses:
      200:
        description: iPXE script
        schema:
            type: string
      404:
        description: Not valid
        schema:
            type: string
    """
    app.logger.info("%s %s" % (request.method, request.url))
    try:
        matchbox_resp = requests.get(
            "%s%s" % (
                app.config["MATCHBOX_URI"],
                request.full_path))
        matchbox_resp.close()
        response = matchbox_resp.content.decode()

        mac = request.args.get("mac")
        if mac:
            repositories.machine_state.update(mac.replace("-", ":"), MachineStates.booting)

        return Response(response, status=200, mimetype="text/plain")

    except requests.exceptions.ConnectionError:
        app.logger.warning("404 for /ipxe")
        return "404", 404
项目:Penny-Dreadful-Tools    作者:PennyDreadfulMTG    | 项目源码 | 文件源码
def create_issue(content, author, location='Discord', repo='PennyDreadfulMTG/Penny-Dreadful-Tools'):
    if content is None or content == '':
        return None
    body = ''
    if '\n' in content:
        title, body = content.split('\n', 1)
        body += '\n\n'
    else:
        title = content
    body += 'Reported on {location} by {author}'.format(location=location, author=author)
    if request:
        body += textwrap.dedent("""
            --------------------------------------------------------------------------------
            Request Method: {method}
            Path: {full_path}
            Cookies: {cookies}
            Endpoint: {endpoint}
            View Args: {view_args}
            Person: {id}
            User-Agent: {user_agent}
            Referrer: {referrer}
        """.format(method=request.method, full_path=request.full_path, cookies=request.cookies, endpoint=request.endpoint, view_args=request.view_args, id=session.get('id', 'logged_out'), user_agent=request.headers.get('User-Agent'), referrer=request.referrer))
    print(title + '\n' + body)
    # Only check for github details at the last second to get log output even if github not configured.
    if not configuration.get('github_user') or not configuration.get('github_password'):
        return None
    g = Github(configuration.get('github_user'), configuration.get('github_password'))
    repo = g.get_repo(repo)
    issue = repo.create_issue(title=title, body=body)
    return issue
项目:SynDBB    作者:D2K5    | 项目源码 | 文件源码
def after_request(response):
        timestamp = strftime('%Y-%b-%d %H:%M:%S')
        useragent = request.user_agent
        logger.info('[%s] [%s] [%s] [%s] [%s] [%s] [%s] [%s]',
            timestamp, request.remote_addr, useragent, request.method,
            request.scheme, request.full_path, response.status, request.referrer)
        return response
项目:SynDBB    作者:D2K5    | 项目源码 | 文件源码
def exceptions(e):
        tb = traceback.format_exc()
        timestamp = strftime('[%Y-%b-%d %H:%M]')
        logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s',
            timestamp, request.remote_addr, request.method,
            request.scheme, request.full_path, tb)
        return e.status_code

#Run the main app...
项目:MagicPress    作者:huang-zp    | 项目源码 | 文件源码
def key_prefix():
    cache_key = 'view_%s' % request.full_path
    return cache_key
项目:Dr0p1t-Framework    作者:D4Vinci    | 项目源码 | 文件源码
def after_request(response):
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s %s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, response.status) )
    return response
项目:Dr0p1t-Framework    作者:D4Vinci    | 项目源码 | 文件源码
def exceptions(e):
    tb = traceback.format_exc()
    timestamp = strftime('[%Y-%b-%d %H:%M]')
    f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, tb) )
    return abort(500)
项目:FlaskService    作者:b96705008    | 项目源码 | 文件源码
def get_api(config, tools, models):
    # config
    API_NAME = config.get('api_journey', 'name')
    DEFAULT_PAGE_SIZE = config.get('api_journey', 'page_size')
    CACHE_TIMEOUT_SECS = config.getint('cache', 'timeout')
    cache = tools['cache']
    mongo = tools['mongo']
    auth = tools['auth']
    event_model = models['events']

    # controller
    ctrler = Blueprint(API_NAME, __name__)

    def make_cache_key():
      """Make a key that includes GET parameters."""
      return request.full_path

    # route
    @ctrler.route('/actors/<actor_id>/events', methods=['GET'])
    @cache.cached(key_prefix=make_cache_key, timeout=CACHE_TIMEOUT_SECS)
    def get_actor_events(actor_id):
        pagesize = int(request.args.get('_page_size', default=DEFAULT_PAGE_SIZE))
        page = int(request.args.get('_page', default=1))

        # parse condition
        cond = parse_query_to_mongo_cond(request.args, {
                'actor.id': actor_id
            })
        output = event_model.query_by_page(cond, pagesize, page)
        return jsonify(output)


    @ctrler.route('/events', methods=['GET'])
    @auth.login_required
    def list_all_events():
        return jsonify({'events': [e for e in event_model.find()]})


    @ctrler.route('/events', methods=['POST'])
    def send_new_event():
        # async operation
        pub = tools['pubsub']['producer']
        event = request.get_json()
        pub.publish(event)
        return jsonify({'status': 'message is sent to backend for processing', 'event': event}), 201


    return {'prefix': '/'+API_NAME, 'ctrler': ctrler}
项目:enjoliver    作者:JulienBalestra    | 项目源码 | 文件源码
def ignition():
    """
    Ignition
    ---
    tags:
      - matchbox
    responses:
      200:
        description: Ignition configuration
        schema:
            type: dict
      403:
        description: Matchbox unavailable
        schema:
            type: text/plain
      503:
        description: Matchbox is out of sync
        schema:
            type: text/plain
    """
    cache_key = "sync-notify"
    last_sync_ts = CACHE.get(cache_key)
    app.logger.debug("cacheKey: %s is set with value %s" % (cache_key, last_sync_ts))

    # we ignore the sync status if we arrived from /ignition-pxe because it's a discovery PXE boot
    if last_sync_ts is None and request.path != "/ignition-pxe":
        app.logger.error("matchbox state is out of sync: cacheKey: %s is None" % cache_key)
        return Response("matchbox is out of sync", status=503, mimetype="text/plain")

    if request.path == "/ignition-pxe":
        app.logger.info("%s %s" % (request.method, request.url))

    matchbox_uri = application.config.get("MATCHBOX_URI")
    if matchbox_uri:
        try:
            # remove the -pxe from the path because matchbox only serve /ignition
            path = request.full_path.replace("/ignition-pxe?", "/ignition?")
            matchbox_resp = requests.get("%s%s" % (matchbox_uri, path))
            resp = matchbox_resp.content
            matchbox_resp.close()
            return Response(resp, status=matchbox_resp.status_code, mimetype="text/plain")
        except requests.RequestException as e:
            app.logger.error("fail to query matchbox ignition %s" % e)
            return Response("matchbox doesn't respond", status=502, mimetype="text/plain")

    return Response("matchbox=%s" % matchbox_uri, status=403, mimetype="text/plain")