我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.request.path()。
def build_sitemap(): from redberry.models import RedPost, RedCategory from apesmit import Sitemap sm = Sitemap(changefreq='weekly') for post in RedPost.all_published(): sm.add(url_for('redberry.show_post', slug=post.slug, _external=True), lastmod=post.updated_at.date()) for category in RedCategory.query.all(): sm.add(url_for('redberry.show_category', category_slug=category.slug, _external=True), lastmod=category.updated_at.date()) with open(os.path.join(REDBERRY_ROOT, 'static', 'redberry', 'sitemap.xml'), 'w') as f: sm.write(f) flash("Sitemap created.", 'success') return redirect(url_for('redberry.home')) ############## # ADMIN ROUTES ##############
def after_request(response): response.headers.add('Access-Control-Allow-Methods', 'GET, POST') response.headers.add('Access-Control-Allow-Credentials', 'true') response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *') response.headers.add('Cache-Control', 'no-cache') response.headers.add('Cache-Control', 'no-store') if api.auth.is_logged_in(): if 'token' in session: response.set_cookie('token', session['token'], domain=app.config['SESSION_COOKIE_DOMAIN']) else: csrf_token = api.common.token() session['token'] = csrf_token response.set_cookie('token', csrf_token, domain=app.config['SESSION_COOKIE_DOMAIN']) # JB: This is a hack. We need a better solution if request.path[0:19] != "/api/autogen/serve/": response.mimetype = 'application/json' return response
def make_pagination_headers(limit, curpage, total, link_header=True): """Return Link Hypermedia Header.""" lastpage = int(math.ceil(1.0 * total / limit) - 1) headers = {'X-Total-Count': str(total), 'X-Limit': str(limit), 'X-Page-Last': str(lastpage), 'X-Page': str(curpage)} if not link_header: return headers base = "{}?%s".format(request.path) links = {} links['first'] = base % urlencode(dict(request.args, **{PAGE_ARG: 0})) links['last'] = base % urlencode(dict(request.args, **{PAGE_ARG: lastpage})) if curpage: links['prev'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage - 1})) if curpage < lastpage: links['next'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage + 1})) headers['Link'] = ",".join(['<%s>; rel="%s"' % (v, n) for n, v in links.items()]) return headers # pylama:ignore=R0201
def iam_sts_credentials(api_version, requested_role, junk=None): if not _supports_iam(api_version): return passthrough(request.path) try: role_params = roles.get_role_params_from_ip( request.remote_addr, requested_role=requested_role ) except roles.UnexpectedRoleError: msg = "Role name {0} doesn't match expected role for container" log.error(msg.format(requested_role)) return '', 404 log.debug('Providing assumed role credentials for {0}'.format(role_params['name'])) assumed_role = roles.get_assumed_role_credentials( role_params=role_params, api_version=api_version ) return jsonify(assumed_role)
def ip_whitelist_add(ip_to_allow, info_record_dict=None): """??ip????, ?????""" if ip_to_allow in single_ip_allowed_set: return dbgprint('ip white added', ip_to_allow, 'info:', info_record_dict) single_ip_allowed_set.add(ip_to_allow) is_ip_not_in_allow_range.cache_clear() append_ip_whitelist_file(ip_to_allow) # dbgprint(single_ip_allowed_set) try: with open(zmirror_root(human_ip_verification_whitelist_log), 'a', encoding='utf-8') as fp: fp.write(datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " " + ip_to_allow + " " + str(request.user_agent) + " " + repr(info_record_dict) + "\n") except: # coverage: exclude errprint('Unable to write log file', os.path.abspath(human_ip_verification_whitelist_log)) traceback.print_exc()
def check_auth(func): """ This decorator for routes checks that the user is authorized (or that no login is required). If they haven't, their intended destination is stored and they're sent to get authorized. It has to be placed AFTER @app.route() so that it can capture `request.path`. """ if 'login' not in conf: return func # inspired by <https://flask-login.readthedocs.org/en/latest/_modules/flask_login.html#login_required> @functools.wraps(func) def decorated_view(*args, **kwargs): if current_user.is_anonymous: print('unauthorized user visited {!r}'.format(request.path)) session['original_destination'] = request.path return redirect(url_for('get_authorized')) print('{} visited {!r}'.format(current_user.email, request.path)) assert current_user.email.lower() in conf.login['whitelist'], current_user return func(*args, **kwargs) return decorated_view
def after_request(response): response.headers.add('Access-Control-Allow-Methods', 'GET, POST') response.headers.add('Access-Control-Allow-Credentials', 'true') response.headers.add('Access-Control-Allow-Headers', 'Content-Type, *') response.headers.add('Cache-Control', 'no-cache') response.headers.add('Cache-Control', 'no-store') if api.auth.is_logged_in(): if 'token' in session: response.set_cookie('token', session['token']) else: csrf_token = api.common.token() session['token'] = csrf_token response.set_cookie('token', csrf_token) # JB: This is a hack. We need a better solution if request.path[0:19] != "/api/autogen/serve/": response.mimetype = 'appication/json' return response
def convert(content_type, content): cachekey = request.path cachevalue = cache.get(cachekey) if cachevalue is None: content = decode(content) content_type = decode(content_type).decode() if content_type.startswith('image/svg+xml'): content = __svg2png(content) content_type = 'image/png' cache.set(cachekey, (content, content_type), timeout=CACHE_RETENTION) else: content = cachevalue[0] content_type = cachevalue[1] response = make_response(content) response.content_type = content_type response.cache_control.max_age = CACHE_RETENTION return response
def init_utils(app): app.jinja_env.filters['unix_time'] = unix_time app.jinja_env.filters['unix_time_millis'] = unix_time_millis app.jinja_env.filters['long2ip'] = long2ip app.jinja_env.globals.update(pages=pages) app.jinja_env.globals.update(can_register=can_register) app.jinja_env.globals.update(mailserver=mailserver) app.jinja_env.globals.update(ctf_name=ctf_name) @app.context_processor def inject_user(): if authed(): return dict(session) return dict() @app.before_request def needs_setup(): if request.path == '/setup' or request.path.startswith('/static'): return if not is_setup(): return redirect('/setup')
def _atlassian_jwt_post_token(self): if not getattr(g, 'ac_client', None): return dict() args = request.args.copy() try: del args['jwt'] except KeyError: pass signature = encode_token( 'POST', request.path + '?' + urlencode(args), g.ac_client.clientKey, g.ac_client.sharedSecret) args['jwt'] = signature return dict(atlassian_jwt_post_url=request.path + '?' + urlencode(args))
def log_exception(self, exc_info): self.logger.error(""" Path: %s HTTP Method: %s Client IP Address: %s User Agent: %s User Platform: %s User Browser: %s User Browser Version: %s GET args: %s view args: %s URL: %s """ % ( request.path, request.method, request.remote_addr, request.user_agent.string, request.user_agent.platform, request.user_agent.browser, request.user_agent.version, dict(request.args), request.view_args, request.url ), exc_info=exc_info)
def log_request(code='-'): proto = request.environ.get('SERVER_PROTOCOL') msg = request.method + ' ' + request.path + ' ' + proto code = str(code) if code[0] == '1': # 1xx - Informational msg = color(msg, attrs=['bold']) if code[0] == '2': # 2xx - Success msg = color(msg, color='white') elif code == '304': # 304 - Resource Not Modified msg = color(msg, color='cyan') elif code[0] == '3': # 3xx - Redirection msg = color(msg, color='green') elif code == '404': # 404 - Resource Not Found msg = color(msg, color='yellow') elif code[0] == '4': # 4xx - Client Error msg = color(msg, color='red', attrs=['bold']) else: # 5xx, or any other response msg = color(msg, color='magenta', attrs=['bold']) logger.info('%s - - [%s] "%s" %s', request.remote_addr, log_date_time_string(), msg, code)
def authenticated(fn): """Mark a route as requiring authentication.""" @wraps(fn) def decorated_function(*args, **kwargs): if not session.get('is_authenticated'): return redirect(url_for('login', next=request.url)) if request.path == '/logout': return fn(*args, **kwargs) if (not session.get('name') or not session.get('email') or not session.get('institution')) and request.path != '/profile': return redirect(url_for('profile', next=request.url)) return fn(*args, **kwargs) return decorated_function
def __call__(self, func): @wraps(func) def decorator(*args, **kwargs): response = func(*args, **kwargs) if not USE_CACHE: return response for path in self.routes: route = '%s,%s,%s' % (path, json.dumps(self.args), None) Cache.delete(unicode(route)) user = kwargs.get('user', None) if self.clear_for_user else None Logger.debug(unicode('clear %s from cache' % (route))) if user: route = '%s,%s,%s' % (path, json.dumps(self.args), user) Cache.delete(unicode(route)) Logger.debug(unicode('clear %s from cache' % (route))) return response return decorator
def sitemap(): conn,curr = sphinx_conn() querysql='SELECT info_hash,create_time FROM film order by create_time desc limit 100' curr.execute(querysql) rows=curr.fetchall() sphinx_close(curr,conn) sitemaplist=[] for row in rows: info_hash = row['info_hash'] mtime = datetime.datetime.fromtimestamp(int(row['create_time'])).strftime('%Y-%m-%d') url = domain+'hash/{}.html'.format(info_hash) url_xml = '<url><loc>{}</loc><lastmod>{}</lastmod><changefreq>daily</changefreq><priority>0.8</priority></url>'.format(url, mtime) sitemaplist.append(url_xml) xml_content = '<?xml version="1.0" encoding="UTF-8"?><urlset>{}</urlset>'.format("".join(x for x in sitemaplist)) with open('static/sitemap.xml', 'wb') as f: f.write(xml_content) f.close() return send_from_directory(app.static_folder, request.path[1:])
def chan_friends(channel): """ If you GET this endpoint, go to /api/v1/channel/<channel>/friend with <channel> replaced for the channel of the friends you want to get <channel> can either be an int that matches the channel, or a string that matches the owner's username """ # model = request.path.split("/")[-1] model = "Friend" if channel.isdigit(): fields = {"channelId": int(channel)} else: fields = {"owner": channel.lower()} packet, code = generate_response( model, request.path, request.method, request.values, data=results, fields=fields ) return make_response(jsonify(packet), code) # There was an error! # if not str(code).startswith("2"): # return make_response(jsonify(packet), code) # NOTE: Not needed currently, but this is how you would check # TODO: Fix this endpoint to remove timing elements (friends are forever) # TODO: Use Object.update(**changes) instead of Object(**updated_object).save()
def chan_messages(channel): """ If you GET this endpoint, go to /api/v1/channel/<channel>/messages with <channel> replaced for the messages you want to get """ model = "Message" if channel.isdigit(): fields = {"channelId": int(channel)} else: fields = {"owner": channel.lower()} packet, code = generate_response( model, request.path, request.method, request.values, fields=fields ) return make_response(jsonify(packet), code)
def user_quotes(channel): """ If you GET this endpoint, go to /api/v1/channel/<channel>/quote with <channel> replaced for the channel you want to get quotes for """ model = "quote" if channel.isdigit(): fields = {"channelId": int(channel), "deleted": False} else: fields = {"owner": channel.lower(), "deleted": False} packet, code = generate_response( model, request.path, request.method, request.values, fields=fields ) return make_response(jsonify(packet), code)
def user_commands(channel): """ If you GET this endpoint, simply go to /api/v1/channel/<channel>/command with <channel> replaced for the channel you want to get commands for """ model = "Command" if channel.isdigit(): fields = {"channelId": int(channel), "deleted": False} else: fields = {"channelName": channel.lower(), "deleted": False} packet, code = generate_response( model, request.path, request.method, request.values, fields=fields ) return make_response(jsonify(packet), code)
def after_request_log(response): name = dns_resolve(request.remote_addr) current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status} Request: {method} {path} Version: {http} Status: {status} Url: {url} IP: {ip} Hostname: {host} Agent: {agent_platform} | {agent_browser} | {agent_browser_version} Raw Agent: {agent} """.format(method=request.method, path=request.path, url=request.url, ip=request.remote_addr, host=name if name is not None else '?', agent_platform=request.user_agent.platform, agent_browser=request.user_agent.browser, agent_browser_version=request.user_agent.version, agent=request.user_agent.string, http=request.environ.get('SERVER_PROTOCOL'), status=response.status)) return response
def request_start(): content_type = request.headers.get('Accept') or '' real_ip = request.headers.get('X-Real-Ip') or '' Log.info(request.path+' '+format_args(request.args)\ +' '+real_ip\ +' '+content_type) #Test content_type # if content_type and content_type not in AVAILABLE_CONTENT_TYPES: # results = {'message' : 'Content-Type not supported', # 'message_code' : 8 # } # return {'error' : 'content-type'} # return self.render(results, status_code = 405)
def cached(key='view{path}'): """ Sets up a function to have cached results """ def decorator(func): """ Gets a cached value or calculates one """ @wraps(func) def decorated_function(*args, **kwargs): ckey = cache_key(key, *args, **kwargs) value = cache.get(ckey) if value is not None: return value print("DB HIT: \"{}\"".format(ckey)) value = func(*args, **kwargs) cache.set(ckey, value) return value return decorated_function return decorator
def post(year, month, day, post_name): rel_url = request.path[len('/post/'):] fixed_rel_url = storage.fix_post_relative_url(rel_url) if rel_url != fixed_rel_url: return redirect(request.url_root + 'post/' + fixed_rel_url) # it's not the correct relative url, so redirect post_ = storage.get_post(rel_url, include_draft=False) if post_ is None: abort(404) post_d = post_.to_dict() del post_d['raw_content'] post_d['content'] = get_parser(post_.format).parse_whole(post_.raw_content) post_d['content'], post_d['toc'], post_d['toc_html'] = parse_toc(post_d['content']) post_d['url'] = make_abs_url(post_.unique_key) post_ = post_d return custom_render_template(post_['layout'] + '.html', entry=post_)
def entity(entity_id): entity = load_entity(entity_id, request.auth) if entity is None: raise NotFound() # load possible duplicates via fingerprint expansion # fingerprints = expand_fingerprints(entity.fingerprints, request.auth) query = Query(request.args, prefix='duplicates_', path=request.path, limit=5) # query.add_facet('countries', 'Countries', country_label) duplicates = search_duplicates(entity.id, entity.fingerprints, query, request.auth) # load links query = Query(request.args, prefix='links_', path=request.path, limit=10) query.add_facet('schemata', 'Types', link_schema_label) query.add_facet('remote.countries', 'Countries', country) links = search_links(entity, query, request.auth) return render_template("entity.html", entity=entity, links=links, duplicates=duplicates)
def store_form(form): entry = FormEntry(form_id=g.page.id) for f in form: field = Field.query.filter_by(form_id=g.page.id).filter_by(name=f.name).one_or_none() if field is None: continue field_entry = FieldEntry(field_id=field.id) data = f.data if field.type == 'file_input': file_data = request.files[field.name] filename = '%s-%s-%s.%s' %(field.name, date_stamp(), str(time.time()).replace('.', ''), os.path.splitext(file_data.filename)[-1]) path = os.path.join(app.config['FORM_UPLOADS_PATH'], secure_filename(filename)) file_data.save(path) data = filename field_entry.value = data db.session.add(field_entry) entry.fields.append(field_entry) db.session.add(entry) db.session.commit()
def route(): args = request.json args = args if args else {} cfg = args.get('cfg', None) log_request( args.get('user', 'unknown'), args.get('hostname', 'unknown'), request.remote_addr, request.method, request.path, args) try: endpoint = create_endpoint(request.method, cfg, args) json, status = endpoint.execute() except AutocertError as ae: status = 500 json = dict(errors={ae.name: ae.message}) return make_response(jsonify(json), status) if not json: raise EmptyJsonError(json) return make_response(jsonify(json), status)
def _create_etag(path, recursive, lm=None): if lm is None: lm = _create_lastmodified(path, recursive) if lm is None: return None hash = hashlib.sha1() hash.update(str(lm)) hash.update(str(recursive)) if path.endswith("/files") or path.endswith("/files/sdcard"): # include sd data in etag hash.update(repr(sorted(printer.get_sd_files(), key=lambda x: x[0]))) return hash.hexdigest()
def get_full_path(ver): """ L2-L4 Path Query, use onePath for single paths """ onepath = False depth = '20' error = version_chk(ver, ['v1.0', 'v1.1', 'v2']) if error: return error if 'onepath' in request.args: if request.args['onepath'] == "True": onepath = True if 'depth' in request.args: depth = request.args['depth'] try: return jsonify(upgrade_api(nglib.query.path.get_full_path(request.args['src'], \ request.args['dst'], {"onepath": onepath, "depth": depth}), ver)) except ResultError as e: return jsonify(errors.json_error(e.expression, e.message))
def get_routed_path(ver): """ Routed Paths, accepts vrf """ onepath = False depth = '20' vrf = 'default' error = version_chk(ver, ['v1.0', 'v1.1', 'v2']) if error: return error if 'onepath' in request.args: if request.args['onepath'] == "True": onepath = True if 'depth' in request.args: depth = request.args['depth'] if 'vrf' in request.args: vrf = request.args['vrf'] try: return jsonify(upgrade_api(nglib.query.path.get_routed_path(request.args['src'], \ request.args['dst'], {"onepath": onepath, "depth": depth, \ "VRF": vrf}), ver)) except ResultError as e: return jsonify(errors.json_error(e.expression, e.message))
def render_redberry(template_name, **kwargs): # For Accelerated Mobile Pages (AMP, ref: https://www.ampproject.org) use templates with a .amp suffix if '.amp' in request.path: kwargs['render_amp'] = True return render_template(template_name, **kwargs)
def login_required(func): @wraps(func) def wrapper(*args, **kwargs): if request.method == 'POST' : if not is_authenticated(): abort(401) else: return func(*args, **kwargs) else: if not is_authenticated(): return redirect("/login/" + "?next=" + request.path) else: return func(*args, **kwargs) return wrapper
def image_list(masterip): data = { "user": session['username'] } # path = request.path[:request.path.rfind("/")] # path = path[:path.rfind("/")+1] result = dockletRequest.post("/image/list/", data, masterip) logger.debug("image" + str(type(result))) return json.dumps(result)
def monitor_request(comid,infotype,masterip): data = { "user": session['username'] } path = request.path[request.path.find("/")+1:] path = path[path.find("/")+1:] path = path[path.find("/")+1:] logger.debug(path + "_____" + masterip) result = dockletRequest.post("/monitor/"+path, data, masterip) logger.debug("monitor" + str(type(result))) return json.dumps(result)
def monitor_user_request(issue,masterip): data = { "user": session['username'] } path = "/monitor/user/" + str(issue) + "/" logger.debug(path + "_____" + masterip) result = dockletRequest.post(path, data, masterip) logger.debug("monitor" + str(type(result))) return json.dumps(result)
def jupyter_prefix(): path = request.args.get('next') if path == None: return redirect('/login/') return redirect('/login/'+'?next='+path)
def login_required(func): @wraps(func) def wrapper(*args, **kwargs): global G_usermgr logger.info ("get request, path: %s" % request.path) token = request.form.get("token", None) if (token == None): return json.dumps({'success':'false', 'message':'user or key is null'}) cur_user = G_usermgr.auth_token(token) if (cur_user == None): return json.dumps({'success':'false', 'message':'token failed or expired', 'Unauthorized': 'True'}) return func(cur_user, cur_user.username, request.form, *args, **kwargs) return wrapper
def get_request_information(): """ Returns a dictionary of contextual information about the user at the time of logging. Returns: The dictionary. """ information = {} if has_request_context(): information["request"] = { "api_endpoint_method": request.method, "api_endpoint": request.path, "ip": request.remote_addr, "platform": request.user_agent.platform, "browser": request.user_agent.browser, "browser_version": request.user_agent.version, "user_agent":request.user_agent.string } if api.auth.is_logged_in(): user = api.user.get_user() team = api.user.get_team() groups = api.team.get_groups() information["user"] = { "username": user["username"], "email": user["email"], "team_name": team["team_name"], "groups": [group["name"] for group in groups] } return information
def teacher_session(): if '/teacher/' in request.path: if 'credentials' not in flask.session: return flask.redirect(flask.url_for('index')) elif not flask.session['is_teacher']: return flask.redirect(flask.url_for('register'))
def student_session(): if '/student/' in request.path: if 'credentials' not in flask.session: return flask.redirect(flask.url_for('index')) elif not flask.session['is_student']: return flask.redirect(flask.url_for('register')) # make sure user is authenticated w/ live session on every request
def manage_session(): # want to go through oauth flow for this route specifically # not get stuck in redirect loop if request.path == '/oauth/callback': return # allow all users to visit the index page without a session if request.path == '/' or request.path == '/oauth/logout': return # validate that user has valid session # add the google user info into session if 'credentials' not in flask.session: flask.session['redirect'] = request.path return flask.redirect(flask.url_for('oauth2callback'))
def iam_role_info(api_version, junk=None): if not _supports_iam(api_version): return passthrough(request.path) role_params_from_ip = roles.get_role_params_from_ip(request.remote_addr) if role_params_from_ip['name']: log.debug('Providing IAM role info for {0}'.format(role_params_from_ip['name'])) return jsonify(roles.get_role_info_from_params(role_params_from_ip)) else: log.error('Role name not found; returning 404.') return '', 404
def iam_role_name(api_version): if not _supports_iam(api_version): return passthrough(request.path) role_params_from_ip = roles.get_role_params_from_ip(request.remote_addr) if role_params_from_ip['name']: return role_params_from_ip['name'] else: log.error('Role name not found; returning 404.') return '', 404
def load_ip_whitelist_file(): """?????ip???""" set_buff = set() if os.path.exists(zmirror_root(human_ip_verification_whitelist_file_path)): with open(zmirror_root(human_ip_verification_whitelist_file_path), 'r', encoding='utf-8') as fp: set_buff.add(fp.readline().strip()) return set_buff
def response_cookies_deep_copy(): """ It's a BAD hack to get RAW cookies headers, but so far, we don't have better way. We'd go DEEP inside the urllib's private method to get raw headers raw_headers example: [('Cache-Control', 'private'), ('Content-Length', '48234'), ('Content-Type', 'text/html; Charset=utf-8'), ('Server', 'Microsoft-IIS/8.5'), ('Set-Cookie','BoardList=BoardID=Show; expires=Mon, 02-May-2016 16:00:00 GMT; path=/'), ('Set-Cookie','aspsky=abcefgh; expires=Sun, 24-Apr-2016 16:00:00 GMT; path=/; HttpOnly'), ('Set-Cookie', 'ASPSESSIONIDSCSSDSSQ=OGKMLAHDHBFDJCDMGBOAGOMJ; path=/'), ('X-Powered-By', 'ASP.NET'), ('Date', 'Tue, 26 Apr 2016 12:32:40 GMT')] """ raw_headers = parse.remote_response.raw._original_response.headers._headers header_cookies_string_list = [] for name, value in raw_headers: if name.lower() == 'set-cookie': if my_host_scheme == 'http://': value = value.replace('Secure;', '') value = value.replace(';Secure', ';') value = value.replace('; Secure', ';') if 'httponly' in value.lower(): if enable_aggressive_cookies_path_rewrite: # ??cookie path??, ???path???? / value = regex_cookie_path_rewriter.sub('path=/;', value) elif enable_aggressive_cookies_path_rewrite is not None: # ??HttpOnly Cookies?path???url? # eg(/extdomains/a.foobar.com): path=/verify; -> path=/extdomains/a.foobar.com/verify if parse.remote_domain not in domain_alias_to_target_set: # do not rewrite main domains value = regex_cookie_path_rewriter.sub( '\g<prefix>=/extdomains/' + parse.remote_domain + '\g<path>', value) header_cookies_string_list.append(value) return header_cookies_string_list
def assemble_parse(): """??????URL???????????URL""" _temp = decode_mirror_url() parse.remote_domain = _temp['domain'] # type: str parse.is_https = _temp['is_https'] # type: bool parse.remote_path = _temp['path'] # type: str parse.remote_path_query = _temp['path_query'] # type: str parse.is_external_domain = is_external_domain(parse.remote_domain) parse.remote_url = assemble_remote_url() # type: str parse.url_no_scheme = parse.remote_url[parse.remote_url.find('//') + 2:] # type: str recent_domains[parse.remote_domain] = True # ????????? dbgprint('after assemble_parse, url:', parse.remote_url, ' path_query:', parse.remote_path_query)
def delete(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) title = project_title(project, "Delete") ensure_authorized_to('read', project) ensure_authorized_to('delete', project) pro = pro_features() project_sanitized, owner_sanitized = sanitize_project_owner(project, owner, current_user) if request.method == 'GET': response = dict(template='/projects/delete.html', title=title, project=project_sanitized, owner=owner_sanitized, n_tasks=n_tasks, overall_progress=overall_progress, last_activity=last_activity, pro_features=pro, csrf=generate_csrf()) return handle_content_type(response) ######### this block was edited by shruthi if("directory_names" in project.info.keys()): for i in project.info["directory_names"]: if os.path.exists(i): shutil.rmtree(i)#deleting the actual folder project_repo.delete(project) ########## end block auditlogger.add_log_entry(project, None, current_user) flash(gettext('Project deleted!'), 'success') return redirect_content_type(url_for('account.profile', name=current_user.name))
def _check_if_redirect_to_password(project): cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT') passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp)) if passwd_mngr.password_needed(project, get_user_id_or_ip()): return redirect(url_for('.password_required', short_name=project.short_name, next=request.path))
def ratelimit(limit, per, send_x_headers=True, scope_func=lambda: request.remote_addr, key_func=lambda: request.endpoint, path=lambda: request.path): """ Decorator for limiting the access to a route. Returns the function if within the limit, otherwise TooManyRequests error """ def decorator(f): @wraps(f) def rate_limited(*args, **kwargs): try: key = 'rate-limit/%s/%s/' % (key_func(), scope_func()) rlimit = RateLimit(key, limit, per, send_x_headers) g._view_rate_limit = rlimit #if over_limit is not None and rlimit.over_limit: if rlimit.over_limit: raise TooManyRequests return f(*args, **kwargs) except Exception as e: return error.format_exception(e, target=path(), action=f.__name__) return update_wrapper(rate_limited, f) return decorator