我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.remote_addr()。
def create_session(data): user = User.find_by_email_or_username(data['username']) if not (user and user.password == data['password']): return make_error_response('Invalid username/password combination', 401) session = Session(user=user) # TODO can this be made more accurate? session.ip_address = request.remote_addr if request.user_agent: session.user_agent = request.user_agent.string # Denormalize user agent session.platform = request.user_agent.platform session.browser = request.user_agent.browser db.session.add(session) db.session.commit() return session
def login(): if current_user.is_authenticated: return redirect(url_for('index')) form = LoginForm() if request.method == 'POST' and form.validate_on_submit(): username = form.username.data if '@' in username: existing_user = User.query.filter_by(email=username).first() else: existing_user = User.query.filter_by(username=username).first() if not (existing_user and existing_user.check_password(form.password.data)): flash('Invalid username or password. Please try again.', 'warning') return render_template('login.html', form=form) login_user(existing_user) db.session.add(Connection(user=existing_user, address=request.remote_addr)) db.session.commit() return redirect(url_for('index')) if form.errors: flash(form.errors, 'danger') return render_template('login.html', form=form)
def access_write_task(original_requester, key=None): """ Determine whether a requester can write to a task or its runs. """ requester = request.remote_addr # Local interfaces are always okay. if requester in local_ips: return True # Tasks without keys are limited to the original requester only if key is None: return requester == original_requester # Beyond here, the task has a key. request_key = arg_string("key") return (request_key is not None) and (request_key == key)
def get_role_credentials(api_version, requested_role, junk=None): try: role_params = roles.get_role_params_from_ip( request.remote_addr, requested_role=requested_role ) except roles.UnexpectedRoleError: return '', 403 try: assumed_role = roles.get_assumed_role_credentials( role_params=role_params, api_version=api_version ) except roles.GetRoleError as e: return '', e.args[0][0] return jsonify(assumed_role)
def zmirror_status(): """????????????""" if request.remote_addr and request.remote_addr != '127.0.0.1': return generate_simple_resp_page(b'Only 127.0.0.1 are allowed', 403) output = "" output += strx('extract_real_url_from_embedded_url', extract_real_url_from_embedded_url.cache_info()) output += strx('\nis_content_type_streamed', is_mime_streamed.cache_info()) output += strx('\nembed_real_url_to_embedded_url', embed_real_url_to_embedded_url.cache_info()) output += strx('\ncheck_global_ua_pass', check_global_ua_pass.cache_info()) output += strx('\nextract_mime_from_content_type', extract_mime_from_content_type.cache_info()) output += strx('\nis_content_type_using_cdn', is_content_type_using_cdn.cache_info()) output += strx('\nis_ua_in_whitelist', is_content_type_using_cdn.cache_info()) output += strx('\nis_mime_represents_text', is_mime_represents_text.cache_info()) output += strx('\nis_domain_match_glob_whitelist', is_domain_match_glob_whitelist.cache_info()) output += strx('\nverify_ip_hash_cookie', verify_ip_hash_cookie.cache_info()) output += strx('\nis_denied_because_of_spider', is_denied_because_of_spider.cache_info()) output += strx('\nis_ip_not_in_allow_range', is_ip_not_in_allow_range.cache_info()) output += strx('\n\ncurrent_threads_number', threading.active_count()) # output += strx('\nclient_requests_text_rewrite', client_requests_text_rewrite.cache_info()) # output += strx('\nextract_url_path_and_query', extract_url_path_and_query.cache_info()) output += strx('\n----------------\n') output += strx('\ndomain_alias_to_target_set', domain_alias_to_target_set) return "<pre>" + output + "</pre>\n"
def retrieveAlertsJson(): """ Retrieve last 5 Alerts in JSON without IPs """ # set cacheItem independent from url parameters, respect community index cacheEntry = request.url # get result from cache getCacheResult = getCache(cacheEntry, "url") if getCacheResult is not False: app.logger.debug('Returning /retrieveAlertsJson from Cache %s' % str(request.remote_addr)) return jsonify(getCacheResult) # query ES else: numAlerts = 35 # Retrieve last X Alerts from ElasticSearch and return JSON formatted with limited alert content returnResult = formatAlertsJson(queryAlertsWithoutIP(numAlerts, checkCommunityIndex(request))) setCache(cacheEntry, returnResult, 25, "url") app.logger.debug('UNCACHED %s' % str(request.url)) return jsonify(returnResult)
def accept_webhook(): try: log.debug("POST request received from {}.".format(request.remote_addr)) data = json.loads(request.data) if type(data) == dict: # older webhook style data_queue.put(data) else: # For RM's frame for frame in data: data_queue.put(frame) except Exception as e: log.error("Encountered error while receiving webhook ({}: {})".format(type(e).__name__, e)) abort(400) return "OK" # request ok # Thread used to distribute the data into various processes (for RocketMap format)
def __call__(self, form, field): if current_app.testing: return True if request.json: response = request.json.get('g-recaptcha-response', '') else: response = request.form.get('g-recaptcha-response', '') remote_ip = request.remote_addr if not response: raise ValidationError(field.gettext(self.message)) if not self._validate_recaptcha(response, remote_ip): field.recaptcha_error = 'incorrect-captcha-sol' raise ValidationError(field.gettext(self.message))
def get_accounts(): system_id = request.args.get('system_id') if not system_id: log.error("Request from {} is missing system_id".format(request.remote_addr)) abort(400) count = int(request.args.get('count', 1)) min_level = int(request.args.get('min_level', 1)) max_level = int(request.args.get('max_level', 40)) reuse = parse_bool(request.args.get('reuse')) or parse_bool(request.args.get('include_already_assigned')) banned_or_new = parse_bool(request.args.get('banned_or_new')) # lat = request.args.get('latitude') # lat = float(lat) if lat else lat # lng = request.args.get('longitude') # lng = float(lng) if lng else lng log.info( "System ID [{}] requested {} accounts level {}-{} from {}".format(system_id, count, min_level, max_level, request.remote_addr)) accounts = Account.get_accounts(system_id, count, min_level, max_level, reuse, banned_or_new) if len(accounts) < count: log.warning("Could only deliver {} accounts.".format(len(accounts))) return jsonify(accounts[0] if accounts and count == 1 else accounts)
def log_exception(self, exc_info): self.logger.error(""" Path: %s HTTP Method: %s Client IP Address: %s User Agent: %s User Platform: %s User Browser: %s User Browser Version: %s GET args: %s view args: %s URL: %s """ % ( request.path, request.method, request.remote_addr, request.user_agent.string, request.user_agent.platform, request.user_agent.browser, request.user_agent.version, dict(request.args), request.view_args, request.url ), exc_info=exc_info)
def log_request(code='-'): proto = request.environ.get('SERVER_PROTOCOL') msg = request.method + ' ' + request.path + ' ' + proto code = str(code) if code[0] == '1': # 1xx - Informational msg = color(msg, attrs=['bold']) if code[0] == '2': # 2xx - Success msg = color(msg, color='white') elif code == '304': # 304 - Resource Not Modified msg = color(msg, color='cyan') elif code[0] == '3': # 3xx - Redirection msg = color(msg, color='green') elif code == '404': # 404 - Resource Not Found msg = color(msg, color='yellow') elif code[0] == '4': # 4xx - Client Error msg = color(msg, color='red', attrs=['bold']) else: # 5xx, or any other response msg = color(msg, color='magenta', attrs=['bold']) logger.info('%s - - [%s] "%s" %s', request.remote_addr, log_date_time_string(), msg, code)
def configure_before_handlers(app): """Configures the before request handlers.""" @app.before_request def update_lastseen(): """Updates `lastseen` before every reguest if the user is authenticated.""" if current_user.is_authenticated: current_user.lastseen = datetime.datetime.utcnow() db.session.add(current_user) db.session.commit() if app.config["REDIS_ENABLED"]: @app.before_request def mark_current_user_online(): if current_user.is_authenticated: mark_online(current_user.username) else: mark_online(request.remote_addr, guest=True)
def _capture_change_info(self): """Capture the change info for the new version. By default calls: (1) :meth:`_fetch_current_user_id` which should return a string or None; and (2) :meth:`_fetch_remote_addr` which should return an IP address string or None; (3) :meth:`_get_custom_change_info` which should return a 1-depth dict of additional keys. These 3 methods generate a ``change_info`` and with 2+ top-level keys (``user_id``, ``remote_addr``, and any keys from :meth:`_get_custom_change_info`) """ change_info = { 'user_id': self._fetch_current_user_id(), 'remote_addr': self._fetch_remote_addr(), } extra_info = self._get_custom_change_info() if extra_info: change_info.update(extra_info) return change_info
def __call__(self, form, field): if current_app.testing: return True if request.json: challenge = request.json.get('recaptcha_challenge_field', '') response = request.json.get('recaptcha_response_field', '') else: challenge = request.form.get('recaptcha_challenge_field', '') response = request.form.get('recaptcha_response_field', '') remote_ip = request.remote_addr if not challenge or not response: raise ValidationError(field.gettext(self.message)) if not self._validate_recaptcha(challenge, response, remote_ip): field.recaptcha_error = 'incorrect-captcha-sol' raise ValidationError(field.gettext(self.message))
def report_event(): """ update DB with new container task state change event :return: str. 'true' if successful """ if not request.json: logger.error('received non-json data') abort(400) logger.info('Received event from {}'.format(request.remote_addr)) logger.debug('Event payload {}'.format(request.json)) event_id = request.json['event_id'] event = request.json['event'] timestamp = request.json['timestamp'] db.put(str(timestamp)+"_"+str(event_id), {'container_id': event_id, 'event_action': event, 'timestamp': timestamp}, 'ecs_id_mapper_events') return 'true'
def make_all_files_public(bucket_id): initSession() config_array = {} config_array["wait_time"] = 1 config_array["max_allowed_from_one_ip"] = 1 config_array["mode"] = 1 if session['logged_in'] or (request.remote_addr == "127.0.0.1" and can_login_local_without_auth()): public_file_sharing_manager = OwnStorjPublicFileSharingManager() files_manager = OwnStorjFilesManager(str(bucket_id)) files_list = files_manager.get_files_list() for file in files_list: if not public_file_sharing_manager.is_file_public(bucket_id=bucket_id, file_id=file["id"]): public_file_hash = public_file_sharing_manager.generate_public_file_hash( input_string=bucket_id + "_" + file["id"] + file["filename"] + str(file["size"]) + file["created"]) public_file_sharing_manager.save_public_file_to_db(bucket_id, file["id"], public_file_hash, public_file_hash, config_array, file["size"], file["filename"], file["created"]) return "SUCCESS", 200 else: return make_response(redirect("/login"))
def add_playlist(): initSession() if session['logged_in'] or (request.remote_addr == "127.0.0.1" and can_login_local_without_auth()): playlist_name = None playlist_category = None playlist_description = None success = False if request.method == 'GET': playlist_name = request.args.get('playlist_name') playlist_category = request.args.get('playlist_category') playlist_description = request.args.get('playlist_description') if playlist_name != None: ownstorj_playlist_manager = OwnStorjPlaylistManager() ownstorj_playlist_manager.add_new_playlist(playlist_name=playlist_name, playlist_description=playlist_description, playlist_category=playlist_category) return "SUCCESS", 200 else: return make_response(redirect("/login"))
def insert_file_to_playlist_endpoint(file_local_public_hash, playlist_id): initSession() ownstorj_playlist_manager = OwnStorjPlaylistManager() ownstorj_public_file_sharing_manager = OwnStorjPublicFileSharingManager() if session['logged_in'] or (request.remote_addr == "127.0.0.1" and can_login_local_without_auth()): if file_local_public_hash != "": public_file_details = ownstorj_public_file_sharing_manager.get_public_file_details_by_local_hash(file_local_public_hash) ownstorj_playlist_manager.insert_track(track_name=public_file_details[0]["file_name"] , track_local_file_id=file_local_public_hash , playlist_id=playlist_id) return '{result: "success"}', 200 # return the HTTP 200 statuss code - OK else: return '{result: "unauthorized"}', 401
def after_request_log(response): name = dns_resolve(request.remote_addr) current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status} Request: {method} {path} Version: {http} Status: {status} Url: {url} IP: {ip} Hostname: {host} Agent: {agent_platform} | {agent_browser} | {agent_browser_version} Raw Agent: {agent} """.format(method=request.method, path=request.path, url=request.url, ip=request.remote_addr, host=name if name is not None else '?', agent_platform=request.user_agent.platform, agent_browser=request.user_agent.browser, agent_browser_version=request.user_agent.version, agent=request.user_agent.string, http=request.environ.get('SERVER_PROTOCOL'), status=response.status)) return response
def configuration(node=None): ''' Retrieve an osquery configuration for a given node. :returns: an osquery configuration file ''' current_app.logger.info( "%s - %s checking in to retrieve a new configuration", request.remote_addr, node ) config = node.get_config() # write last_checkin, last_ip db.session.add(node) db.session.commit() return jsonify(config, node_invalid=False)
def distributed_read(node=None): ''' ''' data = request.get_json() current_app.logger.info( "%s - %s checking in to retrieve distributed queries", request.remote_addr, node ) queries = node.get_new_queries() # need to write last_checkin, last_ip, and update distributed # query state db.session.add(node) db.session.commit() return jsonify(queries=queries, node_invalid=False)
def get_user(): """User information. .. note:: **Privacy note** A users IP address, user agent string, and user id (if logged in) is sent to a message queue, where it is stored for about 5 minutes. The information is used to: - Detect robot visits from the user agent string. - Generate an anonymized visitor id (using a random salt per day). - Detect the users host contry based on the IP address. The information is then discarded. """ return dict( ip_address=request.remote_addr, user_agent=request.user_agent.string, user_id=( current_user.get_id() if current_user.is_authenticated else None ), )
def downloadPic(source,id,fileName): file, ext = os.path.splitext(fileName) result =None article = Article.query.get(id) article.download_num = article.download_num+1 view = ArticleDownload() view.article_id=id view.ip = request.remote_addr if current_user.is_authenticated: view.user_id = current_user.get_id() db.session.add(view) db.session.commit() result = send_from_directory('pics/'+source, file+ext) return result
def itemDetail(id,form=None): article = Article.query.get(id) author = User.query.get(article.author_id) tag = Tag.query.filter_by(article_id=id,status=STATUS_NORMAL).all() keywords = "" for t in tag: keywords = keywords + t.title + ',' keywords = keywords[0:-1] article.view_num = article.view_num+1 view = ArticleView() view.article_id=id view.ip = request.remote_addr isLike = False if current_user.is_authenticated: user_id = current_user.get_id() view.user_id = user_id articleLike = ArticleLike.query.filter_by(article_id=id,user_id=user_id,status=STATUS_NORMAL).first() if articleLike != None: isLike = True db.session.add(view) db.session.commit() return render_template('item.html',article=article,author=author,isLike=isLike,tags=tag,form=form,keywords=keywords)
def login(): if request.method == 'POST': if request.form['user'] is '' or request.form['password'] is '': flash("Can't leave it blank",'danger') else: au=auth() user=au.check(request.form['user'],request.form['password']) if user: login_user(user) flash('Logged in successfully.','success') if user.is_admin: return redirect(url_for('admin')) return redirect(url_for('desktops')) else: flash('Username not found or incorrect password.','warning') remote_addr=request.headers['X-Forwarded-For'] if 'X-Forwarded-For' in request.headers else request.remote_addr disposables=app.isardapi.show_disposable(remote_addr) log.info(disposables) log.info(remote_addr) return render_template('login_disposables.html', disposables=disposables if disposables else '')
def voucher_login(): if request.method == 'POST': remote_addr=request.headers['X-Forwarded-For'] if 'X-Forwarded-For' in request.headers else request.remote_addr au=auth_voucher() if au.check_voucher(request.form['voucher']): if au.check_user_exists(request.form['email']): au.register_user(request.form['voucher'],request.form['email'],remote_addr) flash('Resetting account. Email with new isard user sent to '+request.form['email']+'. Please check your email','warning') else: au.register_user(request.form['voucher'],request.form['email'],remote_addr) flash('Email with isard user sent to '+request.form['email']+'. Please check your email','success') else: flash('Invalid registration voucher code','danger') disposables=False return render_template('login.html', disposables=disposables if disposables else '')
def game(): # Controller page time_full = time.perf_counter() player_ip = request.remote_addr # To redirect players who isent in the ip list and has thearfor ni team redirect_var = True for i in players: if player_ip == i.ip: print("OK") redirect_var = False if redirect_var: return redirect(url_for('index')) team_var = get_team(player_ip) direction_var = None if request.method == 'POST': # Adds a request to move the robot in the multithread queue q.put(request.form['submit']) print(time.perf_counter() - time_full) return render_template('game.html',team=team_var, direction=direction_var)
def dist_index(dist): netbsd_logo_url = url_for('static', filename='images/netbsd.png') if dist is None or dist == '': dist = 'NetBSD-current' if dist not in config.DB_PATHS and dist != 'favicon.ico': return redirect(url_for('search')) ip = request.remote_addr user_agent = request.user_agent platform = user_agent.platform browser = user_agent.browser version = user_agent.version language = user_agent.language referrer = request.referrer dblogger.log_page_visit(1, ip, platform, browser, version, language, referrer, int(time.time()), user_agent.string, dist) return render_template('index.html', netbsd_logo_url=netbsd_logo_url, distnames=distnames)
def catch_hash_page(some_hash): if ip_in_db(request.remote_addr): if some_hash in image_mappings: image = 'Banned!<p></p><img src="images/{}">'\ .format(image_mappings[some_hash]) return render_template( "index.html", content={"link": "", "text": image}) else: return render_template( "index.html", content={"link": "", "text": "wrong way:("}) add_ip(request.remote_addr) return render_template( "index.html", content={ "link": next_url(some_hash), "text": "You're on the right way!"})
def monkey_patch_email_field(form_class): """ We use our monkey patched Email validator, and also a html5 email input. """ from wtforms.fields.html5 import EmailField from flask_security.forms import (email_required, unique_user_email, get_form_field_label) import wtforms.validators from pygameweb.user.rbl import rbl def rbl_spamlist_validator(form, field): """ If the ip address of the person signing up is listed in a spam list, we abort with an error. """ remote_addr = request.remote_addr or None if rbl(remote_addr): abort(500) email_validator = wtforms.validators.Email(message='INVALID_EMAIL_ADDRESS') form_class.email = EmailField(get_form_field_label('email'), validators=[email_required, email_validator, rbl_spamlist_validator, unique_user_email])
def request_register(): username = str(request.form['username']) requestid = str(request.form['requestid']) client_ip = request.remote_addr if not pivportal.security.username_is_valid(username) or not pivportal.security.requestid_is_valid(requestid) or not pivportal.security.ip_is_valid(client_ip): # client_ip is None when testing, so its ok return Response(response=json.dumps({"response": " invalid request"}), status=400, mimetype="application/json") if pivportal.security.is_duplicate_register(username, requestid, redis_store.hgetall("requests")): return Response(response=json.dumps({"response": " invalid request"}), status=400, mimetype="application/json") this_request = {"username": username, "client_ip": client_ip, "authorized": False, "time": time.time()} redis_store.hmset("requests", {requestid: json.dumps(this_request)}) return Response(response=json.dumps({"response": "success"}), status=200, mimetype="application/json")
def request_status(): username = str(request.form['username']) requestid = str(request.form['requestid']) client_ip = request.remote_addr if not pivportal.security.username_is_valid(username) or not pivportal.security.requestid_is_valid(requestid) or not pivportal.security.ip_is_valid(client_ip): return Response(response=json.dumps({"response": " invalid request"}), status=400, mimetype="application/json") auth_requests = redis_store.hgetall("requests") if requestid in auth_requests: this_request = json.loads(auth_requests[requestid]) if this_request["username"] == username and this_request["client_ip"] == client_ip: if this_request["authorized"] == True: # Success redis_store.hdel("requests", requestid) return Response(response=json.dumps({"response": "success"}), status=200, mimetype="application/json") else: # Delete auth_request, it failed anyway redis_store.hdel("requests", requestid) return Response(response=json.dumps({"response": "Authentication Failure"}), status=401, mimetype="application/json")
def register(): """ Register a new client """ if request.method == 'POST': client_id = request.form.get('uuid','') user_agent = request.form.get('user_agent','') ip = request.remote_addr if client_id and user_agent and ip: print("Register: ", client_id) c = Client(client_id, user_agent, ip) db.session.add(c) # add pre flight scripts c.add_preflight() db.session.commit() return 'Hello {}!'.format(client_id) else: return 'UUID is not present'
def before_request(self,*args): """Verfies that the API is Vaild for the correct user and the IP address hasn't changed since log in""" signer = TimestampSigner(SECRET_KEY) api_key = request.headers['API_KEY'] Client_id = request.headers['Client_ID'] ip_address = request.remote_addr user = User.query.filter_by(Client_id=Client_id).first() if user == None: return make_response(jsonify({'Failure': 'Invaild User'}), 400) if api_key != user.api_key: return make_response(jsonify({'Failure': 'Incorrect API Key'}), 400) if ip_address != user.current_login_ip: return make_response(jsonify({'Failure': 'Incorrect IP for Client, Please Re-login in'}), 400) try: signer.unsign(api_key, max_age=86164) except: return make_response(jsonify({'Failure': 'Invaild API Key, please request new API Key'}), 400)
def route(): args = request.json args = args if args else {} cfg = args.get('cfg', None) log_request( args.get('user', 'unknown'), args.get('hostname', 'unknown'), request.remote_addr, request.method, request.path, args) try: endpoint = create_endpoint(request.method, cfg, args) json, status = endpoint.execute() except AutocertError as ae: status = 500 json = dict(errors={ae.name: ae.message}) return make_response(jsonify(json), status) if not json: raise EmptyJsonError(json) return make_response(jsonify(json), status)
def main(): iplow = ip2long('192.30.252.0') iphigh = ip2long('192.30.255.255') if request.remote_addr in range(iplow, iphigh): payload = request.get_json() if payload["repository"]["name"] == "Python-IRC-Bot": try: subprocess.check_call(["git", "pull"]) except subprocess.CalledProcessError: irc.privmsg("##wolfy1339", "git pull failed!") else: if "handlers.py" in payload['head_commit']['modified']: reload_handlers(bot) return flask.Response("Thanks.", mimetype="text/plain") return flask.Response("Wrong repo.", mimetype="text/plain") else: flask.abort(403)
def app_getinfo(ver): """ Returns Flask API Info """ response = dict() response['api_version'] = ver response['message'] = "Flask API Data" response['status'] = "200" response['method'] = request.method response['path'] = request.path response['remote_addr'] = request.remote_addr response['user_agent'] = request.headers.get('User-Agent') # GET attributes for key in request.args: response['GET ' + key] = request.args.get(key, '') # POST Attributes for key in request.form.keys(): response['POST ' + key] = request.form[key] return jsonify(response)