我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用flask.request.full_path()。
def needs_authentication(): """ Decorator: Attach this to a route and it will require that the session has been previously authenticated. """ def auth_chk_wrapper(f): # This is our decoratorated function wrapper @wraps(f) def deco(*args, **kwargs): # If the session does not yet have an authentication if not is_authenticated(): return redirect(sign_auth_path(request.full_path)) else: return f(*args, **kwargs) return deco return auth_chk_wrapper
def metadata(): """ Metadata --- tags: - matchbox responses: 200: description: Metadata of the current group/profile schema: type: string """ matchbox_uri = application.config.get("MATCHBOX_URI") if matchbox_uri: matchbox_resp = requests.get("%s%s" % (matchbox_uri, request.full_path)) resp = matchbox_resp.content matchbox_resp.close() return Response(resp, status=matchbox_resp.status_code, mimetype="text/plain") return Response("matchbox=%s" % matchbox_uri, status=403, mimetype="text/plain")
def cached(timeout=5 * 60, key='blog_view_%s'): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): cache_key = key % request.full_path if request.method == 'POST': value = f(*args, **kwargs) cache.set(cache_key, value, timeout=timeout) return value value = cache.get(cache_key) if value is None: value = f(*args, **kwargs) cache.set(cache_key, value, timeout=timeout) return value return decorated_function return decorator
def after_request(response): timestamp = strftime('[%Y-%b-%d %H:%M]') f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s %s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, response.status) ) return response
def exceptions(e): tb = traceback.format_exc() timestamp = strftime('[%Y-%b-%d %H:%M]') f = open("server.log","a").write( "\n"+"--"*10+"\n"+'%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s'%(timestamp, request.remote_addr, request.method, request.scheme, request.full_path, tb) ) return abort(500)
def before_request(): ''' Pre-request hander ''' logger.debug("Incoming Web Request: {0}".format(request.full_path)) g.dbc = connect_db(app.config)
def internal_server_error(e): message = repr(e) trace = traceback.format_exc() trace = string.split(trace, '\n') timestamp = (datetime.fromtimestamp(time.time()) .strftime('%Y-%m-%d %H:%M:%S')) if current_user.is_authenticated: user = current_user.username else: user = 'anonymous' gathered_data = ('message: {}\n\n\n' 'timestamp: {}\n' 'ip: {}\n' 'method: {}\n' 'request.scheme: {}\n' 'request.full_path: {}\n' 'user: {}\n\n\n' 'trace: {}'.format(message, timestamp, request.remote_addr, request.method, request.scheme, request.full_path, user, '\n'.join(trace))) # send email to admin if app.config['TESTING']: print(gathered_data) else: mail_message = gathered_data msg = Message('Error: ' + message[:40], body=mail_message, recipients=[app.config['ADMIN_MAIL']]) mail.send(msg) flash(_('A message has been sent to the administrator'), 'info') bookcloud_before_request() return render_template('500.html', message=gathered_data), 500
def save_timing(name, t0): timing = Timing(start=t0, path=request.full_path, name=name, seconds=time() - t0) database.session.add(timing)
def add_comment(user, article_id): """ @api {post} /article/:id/comment Post comment to an article @apiName Post comment to an article @apiGroup Comment @apiUse AuthorizationTokenHeader @apiParam {String} id Article unique ID. @apiParam {String} comment The comment to be added @apiParam {Boolean} public The privacy setting for the comment, default is True @apiParamExample Request (Example) { "comment": "I hate you", "public": "false" } @apiUse UnauthorizedAccessError @apiUse ResourceDoesNotExist @apiUse BadRequest """ app.logger.info('User {} Access {}'.format(user, request.full_path)) # Get request body req = RequestUtil.get_request() comment = req.get('comment') public = req.get('public') # Add comment result = MongoUtil.add_comment(user, article_id, comment, public) # If error occurs if isinstance(result, str): return ResponseUtil.error_response(result) app.logger.info('User {} Create comment {}'.format(user, result.id)) return jsonify(JsonUtil.serialize(result))
def upload(path=""): """ Uploads a file to a place. """ # Check that uploads are OK if not can_upload(): return redirect(sign_auth_path(request.full_path)) if request.method=='POST': # handle uploaded files if not 'file' in request.files: abort(400) # General UA error file = request.files["file"] # We default to using the name of the file provided, # but allow the filename to be changed via POST details. fname = file.filename if 'name' in request.form: fname = request.form['name'] safe_fname = secure_filename(fname) if not allowed_filename(safe_fname): abort(400) # General client error # We're handling a potentially dangerous path, better run it through # The flask path jointer. basepath=app.config.get("storage", "location") fullpath = safe_join(basepath, path) file.save(os.path.join(fullpath,fname)) flash("File uploaded successfully") return redirect(url_for('browser.upload',path=path)) else: return render_template("browser/upload.html",path=path,title="Upload Files")
def __call__(self, f): @functools.wraps(f) def my_decorator(*args, **kwargs): global _is_cached _is_cached = True if _cache is None: return f(*args, **kwargs) response = _cache.get(request.full_path) if response is None: response = f(*args, **kwargs) _cache.set(request.path, response, self.timeout) _is_cached = False if self.render_layout: wrapped_response = make_response("%s%s%s" % (render_template("layout_header.html"), response, render_template("layout_footer.html"))) if is_response(response): wrapped_response.status_code = response.status_code wrapped_response.headers = response.headers wrapped_response.status = response.status wrapped_response.mimetype = response.mimetype return wrapped_response else: return response functools.update_wrapper(my_decorator, f) return my_decorator
def setup_session(): session.permanent = True app.permanent_session_lifetime = timedelta(days=365*30) if not 'uuid' in session: session['uuid'] = str(uuid.uuid4()) g.uuid_is_fresh = True else: g.uuid_is_fresh = False now = datetime.now() referrer = request.headers.get('Referer', '') path = request.path full_path = request.full_path agent = request.headers.get('User-Agent', '') if agent in BLACKLIST_AGENT or len(agent) < 15: g.request_log_id = 0 return render_template('error.html',code=200,message="Layer 8 error. If you want my data, DON'T SCRAPE (too much cpu load), contact me and I will give it to you"), 200 with db_session: req_log = RequestLog( uuid=session['uuid'], uuid_is_fresh=g.uuid_is_fresh, created_at=now, agent=agent, referrer=referrer, path=path, full_path=full_path) flush() g.request_log_id = req_log.id
def login_required(func): """decorator for login required This decorator can be used for declaring a function which requires authentication """ @wraps(func) def handle_login_required(*args, **kwargs): # try to get username from session try: username = session["username"] return func(*args, **kwargs) except: pass # allow HTTP basic auth try: user_basic = request.authorization.username password_basic = request.authorization.password authprovider = security.AuthenticationProvider.get_authprovider() if authprovider.authenticate(user_basic, password_basic): return func(*args, **kwargs) except: pass # on error: redirect to login page or return HTTP/401 if json_check(): return json_error("Unauthenticated", 401) session["redirect"] = request.full_path return redirect("/login") return handle_login_required
def before_request(): logger.info('feedback request', url_path=request.full_path) metadata = get_metadata(current_user) if metadata: logger.bind(tx_id=metadata['tx_id']) g.schema_json = load_schema_from_metadata(metadata)
def before_request(): metadata = get_metadata(current_user) if metadata: logger.bind(tx_id=metadata['tx_id']) values = request.view_args logger.bind(eq_id=values['eq_id'], form_type=values['form_type'], ce_id=values['collection_id']) logger.info('questionnaire request', method=request.method, url_path=request.full_path) if metadata: g.schema_json = load_schema_from_metadata(metadata) _check_same_survey(values['eq_id'], values['form_type'], values['collection_id'])
def cached(timeout=1 * 60, key='view'): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): cache_key = '{}/{}'.format(key, request.full_path) rv = cache.get(cache_key) if rv is not None: return rv rv = f(*args, **kwargs) cache.set(cache_key, rv, timeout=timeout) return rv return decorated_function return decorator
def __call__(self, *args, **kwargs): if not current_app.config.get('INDEXING_ENABLED', True): return self._route(*args, **kwargs) log = PageView( page=request.full_path, endpoint=request.endpoint, user_id=current_user.id, ip_address=request.remote_addr, version=__version__ ) errorlog = None log.object_id, log.object_type, log.object_action, reextract_after_request = self.extract_objects(*args, **kwargs) db_session.add(log) # Add log here to ensure pageviews are accurate try: return self._route(*args, **kwargs) except Exception as e: db_session.rollback() # Ensure no lingering database changes remain after crashed route db_session.add(log) errorlog = ErrorLog.from_exception(e) db_session.add(errorlog) db_session.commit() raise_with_traceback(e) finally: # Extract object id and type after response generated (if requested) to ensure # most recent data is collected if reextract_after_request: log.object_id, log.object_type, log.object_action, _ = self.extract_objects(*args, **kwargs) if errorlog is not None: log.id_errorlog = errorlog.id db_session.add(log) db_session.commit()
def after_request(response): logger.info('%s %s %s %s %s', request.remote_addr, request.method, request.scheme, request.full_path, response.status) return response
def exceptions(e): tb = traceback.format_exc() logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s', request.remote_addr, request.method, request.scheme, request.full_path, tb) return e.status_code
def after_request(response): logger.info('%s %s %s %s %s', request.method, request.environ.get('HTTP_X_REAL_IP', request.remote_addr), request.scheme, request.full_path, response.status) return response
def exceptions(e): tb = traceback.format_exc() tb = tb.decode('utf-8') logger.error('%s %s %s %s 5xx INTERNAL SERVER ERROR\n%s', request.environ.get('HTTP_X_REAL_IP', request.remote_addr), request.method, request.scheme, request.full_path, tb) return '500 INTERNAL SERVER ERROR', 500
def _before_request(self): if self._has_token_checker is None or self.token_view is None: return if request.endpoint != self.token_view and request.endpoint not in self.token_view_overrides and self._has_token_checker() is not None and not self._has_token_checker(): return redirect(url_for(self.token_view, next=request.full_path))
def after_request(response): timestamp = strftime('[%Y-%b-%d %H:%M]') logger.error('%s %s %s %s %s %s',timestamp , request.remote_addr , \ request.method , request.scheme , request.full_path , response.status) return response
def exceptions(e): tb = traceback.format_exc() timestamp = strftime('[%Y-%b-%d %H:%M]') logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s', timestamp, request.remote_addr, request.method, request.scheme, request.full_path, tb) return make_response(e , 405)
def ipxe(): """ iPXE --- tags: - matchbox responses: 200: description: iPXE script schema: type: string 404: description: Not valid schema: type: string """ app.logger.info("%s %s" % (request.method, request.url)) try: matchbox_resp = requests.get( "%s%s" % ( app.config["MATCHBOX_URI"], request.full_path)) matchbox_resp.close() response = matchbox_resp.content.decode() mac = request.args.get("mac") if mac: repositories.machine_state.update(mac.replace("-", ":"), MachineStates.booting) return Response(response, status=200, mimetype="text/plain") except requests.exceptions.ConnectionError: app.logger.warning("404 for /ipxe") return "404", 404
def create_issue(content, author, location='Discord', repo='PennyDreadfulMTG/Penny-Dreadful-Tools'): if content is None or content == '': return None body = '' if '\n' in content: title, body = content.split('\n', 1) body += '\n\n' else: title = content body += 'Reported on {location} by {author}'.format(location=location, author=author) if request: body += textwrap.dedent(""" -------------------------------------------------------------------------------- Request Method: {method} Path: {full_path} Cookies: {cookies} Endpoint: {endpoint} View Args: {view_args} Person: {id} User-Agent: {user_agent} Referrer: {referrer} """.format(method=request.method, full_path=request.full_path, cookies=request.cookies, endpoint=request.endpoint, view_args=request.view_args, id=session.get('id', 'logged_out'), user_agent=request.headers.get('User-Agent'), referrer=request.referrer)) print(title + '\n' + body) # Only check for github details at the last second to get log output even if github not configured. if not configuration.get('github_user') or not configuration.get('github_password'): return None g = Github(configuration.get('github_user'), configuration.get('github_password')) repo = g.get_repo(repo) issue = repo.create_issue(title=title, body=body) return issue
def after_request(response): timestamp = strftime('%Y-%b-%d %H:%M:%S') useragent = request.user_agent logger.info('[%s] [%s] [%s] [%s] [%s] [%s] [%s] [%s]', timestamp, request.remote_addr, useragent, request.method, request.scheme, request.full_path, response.status, request.referrer) return response
def exceptions(e): tb = traceback.format_exc() timestamp = strftime('[%Y-%b-%d %H:%M]') logger.error('%s %s %s %s %s 5xx INTERNAL SERVER ERROR\n%s', timestamp, request.remote_addr, request.method, request.scheme, request.full_path, tb) return e.status_code #Run the main app...
def key_prefix(): cache_key = 'view_%s' % request.full_path return cache_key
def get_api(config, tools, models): # config API_NAME = config.get('api_journey', 'name') DEFAULT_PAGE_SIZE = config.get('api_journey', 'page_size') CACHE_TIMEOUT_SECS = config.getint('cache', 'timeout') cache = tools['cache'] mongo = tools['mongo'] auth = tools['auth'] event_model = models['events'] # controller ctrler = Blueprint(API_NAME, __name__) def make_cache_key(): """Make a key that includes GET parameters.""" return request.full_path # route @ctrler.route('/actors/<actor_id>/events', methods=['GET']) @cache.cached(key_prefix=make_cache_key, timeout=CACHE_TIMEOUT_SECS) def get_actor_events(actor_id): pagesize = int(request.args.get('_page_size', default=DEFAULT_PAGE_SIZE)) page = int(request.args.get('_page', default=1)) # parse condition cond = parse_query_to_mongo_cond(request.args, { 'actor.id': actor_id }) output = event_model.query_by_page(cond, pagesize, page) return jsonify(output) @ctrler.route('/events', methods=['GET']) @auth.login_required def list_all_events(): return jsonify({'events': [e for e in event_model.find()]}) @ctrler.route('/events', methods=['POST']) def send_new_event(): # async operation pub = tools['pubsub']['producer'] event = request.get_json() pub.publish(event) return jsonify({'status': 'message is sent to backend for processing', 'event': event}), 201 return {'prefix': '/'+API_NAME, 'ctrler': ctrler}
def ignition(): """ Ignition --- tags: - matchbox responses: 200: description: Ignition configuration schema: type: dict 403: description: Matchbox unavailable schema: type: text/plain 503: description: Matchbox is out of sync schema: type: text/plain """ cache_key = "sync-notify" last_sync_ts = CACHE.get(cache_key) app.logger.debug("cacheKey: %s is set with value %s" % (cache_key, last_sync_ts)) # we ignore the sync status if we arrived from /ignition-pxe because it's a discovery PXE boot if last_sync_ts is None and request.path != "/ignition-pxe": app.logger.error("matchbox state is out of sync: cacheKey: %s is None" % cache_key) return Response("matchbox is out of sync", status=503, mimetype="text/plain") if request.path == "/ignition-pxe": app.logger.info("%s %s" % (request.method, request.url)) matchbox_uri = application.config.get("MATCHBOX_URI") if matchbox_uri: try: # remove the -pxe from the path because matchbox only serve /ignition path = request.full_path.replace("/ignition-pxe?", "/ignition?") matchbox_resp = requests.get("%s%s" % (matchbox_uri, path)) resp = matchbox_resp.content matchbox_resp.close() return Response(resp, status=matchbox_resp.status_code, mimetype="text/plain") except requests.RequestException as e: app.logger.error("fail to query matchbox ignition %s" % e) return Response("matchbox doesn't respond", status=502, mimetype="text/plain") return Response("matchbox=%s" % matchbox_uri, status=403, mimetype="text/plain")