我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用flask.request.host_url()。
def createTeam(): """ """ if request.environ.get('HTTP_ORIGIN') == request.host_url[:-1]: newTeam = Team.query.filter_by(team_name=request.args['team']).first() dbEnv = AresSql.SqliteDB(request.args['report_name']) if not newTeam: team = Team(request.args['team'], request.args['team_email']) db.session.add(team) db.session.commit() newTeam = Team.query.filter_by(team_name=request.args['team']).first() team_id = newTeam.team_id role = request.args.get('role', 'user') dbEnv.modify("""INSERT INTO team_def (team_id, team_name, role) VALUES (%s, '%s', '%s'); INSERT INTO env_auth (env_id, team_id) SELECT env_def.env_id, %s FROM env_def WHERE env_def.env_name = '%s' ;""" % (team_id, request.args['team'], role, team_id, request.args['report_name'])) return json.dumps('Success'), 200 return json.dumps('Forbidden', 403)
def __init__(self, **kwargs): """Initialises a new ``Self`` link instance. Accepts the same Keyword Arguments as :class:`.Link`. Additional Keyword Args: external (bool): if true, force link to be fully-qualified URL, defaults to False See Also: :class:`.Link` """ url = request.url external = kwargs.get('external', False) if not external and current_app.config['SERVER_NAME'] is None: url = request.url.replace(request.host_url, '/') return super(Self, self).__init__('self', url, **kwargs)
def auth(): # Handles the Azure AD authorization endpoint response and sends second response to get access token. try: # Gets the token_id from the flask response form dictionary. token_id = request.form['id_token'] # Gets the authorization code from the flask response form dictionary. code = request.form['code'] # Constructs redirect uri to be send as query string param to Azure AD token issuance endpoint. redirect_uri = '{0}auth'.format(request.host_url) # Constructs Azure AD token issuance endpoint url. url = issuance_url(token_id, c['AUTHORITY']) # Requests access token and stores it in session. token = access_token(url, redirect_uri, c['CLIENT_ID'], code, c['CLIENT_SECRET']) if token != '': session['access_token'] = token else: flash('Could not get access token.') except: flash('Something went wrong.') return redirect(url_for('home')) # This script runs the application using a development server.
def handle_content_message(event): if isinstance(event.message, ImageMessage): ext = 'jpg' elif isinstance(event.message, VideoMessage): ext = 'mp4' elif isinstance(event.message, AudioMessage): ext = 'm4a' else: return message_content = line_bot_api.get_message_content(event.message.id) with tempfile.NamedTemporaryFile(dir=static_tmp_path, prefix=ext + '-', delete=False) as tf: for chunk in message_content.iter_content(): tf.write(chunk) tempfile_path = tf.name dist_path = tempfile_path + '.' + ext dist_name = os.path.basename(dist_path) os.rename(tempfile_path, dist_path) line_bot_api.reply_message( event.reply_token, [ TextSendMessage(text='Save content.'), TextSendMessage(text=request.host_url + os.path.join('static', 'tmp', dist_name)) ])
def handle_file_message(event): message_content = line_bot_api.get_message_content(event.message.id) with tempfile.NamedTemporaryFile(dir=static_tmp_path, prefix='file-', delete=False) as tf: for chunk in message_content.iter_content(): tf.write(chunk) tempfile_path = tf.name dist_path = tempfile_path + '-' + event.message.file_name dist_name = os.path.basename(dist_path) os.rename(tempfile_path, dist_path) line_bot_api.reply_message( event.reply_token, [ TextSendMessage(text='Save file.'), TextSendMessage(text=request.host_url + os.path.join('static', 'tmp', dist_name)) ])
def forgot_password(): if request.method=="GET": return render_template("forgot_password.html") username = request.form["username"] email = request.form["email"] try: user = ldaptools.getuser(username) assert(user) assert(email == user.email[0]) token = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for x in range(24)) url = request.host_url+"recovery/"+token recoverymap[token] = username emailtools.render_email(email, "Password Recovery", "forgot_password.txt", url=url, config=app.config) flash("Email sent to "+email, "success") except Exception as e: print e flash("Username/Email mismatch", "danger") return redirect("/login")
def api_makepublic(): if request.method == 'GET': uniqueid = request.args['uniqueid'] file = File.query.filter_by(unique_id=uniqueid).first() if file is not None: if g.user.admin or (g.user.id == file.uploader_id): key = PublicKey() key.public = True if file.publickey is None: file.publickey = key db.session.commit() url = request.host_url + "pub/dl/" + key.hash button = get_sharebutton(file.publickey, 'ban', "Disable Public") return jsonify(response=responds['PUBLIC_KEY_GENERATED'], url=url, button=button) else: url = request.host_url + "pub/dl/" + file.publickey.hash return jsonify(response=responds['PUBLIC_KEY_ALREADY_GEN'], url=url) return jsonify(response=responds['SOME_ERROR'])
def initiate(): """ 1. step Initiate app installation """ args = request.args # get shop url from args shop_url = args.get('shop') # TODO: validate HMAC, so we know that request really is from shopify if not current_user.is_authenticated: return redirect(url_for('main.signup', next=url_join(request.host_url, url_for('shopify.initiate', shop=shop_url)))) api_key = current_app.config['SHOPIFY_API_KEY'] secret = current_app.config['SHOPIFY_API_SECRET'] url = get_permission_url(shop_url, api_key, secret) return redirect(url)
def is_safe_url(target): ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and \ ref_url.netloc == test_url.netloc
def is_safe_url(target): ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
def is_safe_redirect_url(target): """https://security.openstack.org/guidelines/dg_avoid-unvalidated-redirects.html""" # noqa host_url = urlparse(request.host_url) redirect_url = urlparse(urljoin(request.host_url, target)) return redirect_url.scheme in ('http', 'https') and \ host_url.netloc == redirect_url.netloc
def home(): # Renders the home page. redirect_uri = '{0}auth'.format(request.host_url) # Generates Azure AD authorization endpoint url with parameters so the user authenticates and consents, if consent is required. url = login_url(redirect_uri, c['CLIENT_ID'], c['RESOURCE'], c['AUTHORITY']) user = {} # Checks if access token has already been set in flask session. if 'access_token' in session: # Gets authenticated user details from SharePoint tenant if access token is acquired. user = user_details(c['RESOURCE'], session['access_token']) # Renders the index template with additional params for the login url and user. return render_template('index.html', url=url, user=user)
def send_mail_for_execute_success(sql_id): if (settings.EMAIL_SEND_ENABLE): sql_info = get_sql_info_by_id(sql_id) sql_info.status_str = settings.SQL_WORK_STATUS_DICT[sql_info.status] sql_info.host_url = request.host_url sql_info.email = "{0},{1}".format(cache.MyCache().get_user_email(sql_info.create_user_id), cache.MyCache().get_user_email(sql_info.audit_user_id)) if (len(sql_info.email) > 0): subject = "SQL??-[{0}]-????".format(sql_info.title) sql_info.work_url = "{0}execute/sql/execute/new/{1}".format(request.host_url, sql_info.id) content = render_template("mail_template.html", sql_info=sql_info) common_util.send_html(subject, sql_info.email, content) # ?????????? # ???????????
def get_request_log(): # parse args and forms values = '' if len(request.values) > 0: for key in request.values: values += key + ': ' + request.values[key] + ', ' route = '/' + request.base_url.replace(request.host_url, '') request_log = { 'route': route, 'method': request.method, } # ??xml ?? category = request_log['route'].split('/')[1] if category != "virtual_card": return request_log if len(request.get_data()) != 0: body = json.loads(request.get_data()) if "password" in body: body.pop("password") request_log['body'] = body if len(values) > 0: request_log['values'] = values return request_log
def is_safe_url(target): ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and\ ref_url.netloc == test_url.netloc
def show_vulnerability(vulnerability_id): if authenticated() is False: return render_template('login.html', return_to='?return_to=' + request.host_url + 'vulnerability/{}'.format(vulnerability_id) ) return render_template('vulnerability.html', vulnerability_id=vulnerability_id)
def is_safe_url(target: str) -> bool: ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return ( # same scheme test_url.scheme in ('http', 'https') and # same host and port ref_url.netloc == test_url.netloc and # and different endoint ref_url.path != test_url.path )
def render_graph(): url = request.args.get('url') if not url: return 'Specify a graph URL in the request', 400 return render_template('graph.html', url=request.host_url[:-1] + url)
def register(): hostname = request.args.get('hostname') master = request.args.get('master') if not master: campaign = get_best_campaign() if not campaign: return 'No active campaigns', 404 instance = models.FuzzerInstance.create(hostname=hostname) instance.start_time = time.time() campaign.fuzzers.append(instance) campaign.commit() else: campaign = models.Campaign.get(id=master) if not campaign: return 'Could not find specified campaign', 404 if campaign.fuzzers.filter_by(master=True).first(): return 'Campaign already has a master', 400 instance = models.FuzzerInstance.create(hostname=hostname, master=True) instance.start_time = time.time() campaign.fuzzers.append(instance) campaign.commit() # avoid all hosts uploading at the same time from reporting at the same time deviation = random.randint(15, 30) return jsonify( id=instance.id, name=secure_filename(instance.name), program=campaign.executable_name, program_args=campaign.executable_args.split(' ') if campaign.executable_args else [], # TODO: add support for spaces args=campaign.afl_args.split(' ') if campaign.afl_args else [], campaign_id=campaign.id, campaign_name=secure_filename(campaign.name), download=request.host_url[:-1] + url_for('fuzzers.download', campaign_id=campaign.id), submit=request.host_url[:-1] + url_for('fuzzers.submit', instance_id=instance.id), submit_crash=request.host_url[:-1] + url_for('fuzzers.submit_crash', instance_id=instance.id), upload=request.host_url[:-1] + url_for('fuzzers.upload', instance_id=instance.id), upload_in=current_app.config['UPLOAD_FREQUENCY'] + deviation )
def download(campaign_id): campaign = models.Campaign.get(id=campaign_id) sync_dir = os.path.join(current_app.config['DATA_DIRECTORY'], secure_filename(campaign.name), 'sync_dir', '*.tar') return jsonify( executable=request.host_url[:-1] + url_for('fuzzers.download_executable', campaign_id=campaign.id), libraries=request.host_url[:-1] + url_for('fuzzers.download_libraries', campaign_id=campaign.id), testcases=request.host_url[:-1] + url_for('fuzzers.download_testcases', campaign_id=campaign.id), ld_preload=request.host_url[:-1] + url_for('fuzzers.download_ld_preload', campaign_id=campaign.id), dictionary=request.host_url[:-1] + url_for('fuzzers.download_dictionary', campaign_id=campaign.id) if campaign.has_dictionary else None, sync_dirs=[ request.host_url[:-1] + url_for('fuzzers.download_syncdir', campaign_id=campaign.id, filename=os.path.basename(filename)) for filename in glob.glob(sync_dir) ], sync_in=current_app.config['DOWNLOAD_FREQUENCY'], )
def analysis_queue(campaign_id): campaign = models.Campaign.get(id=campaign_id) return jsonify( program=campaign.executable_name, program_args=campaign.executable_args.split(' ') if campaign.executable_args else [], # TODO: add support for spaces crashes=[{ 'crash_id': crash.id, 'download': request.host_url[:-1] + url_for('fuzzers.download_crash', crash_id=crash.id) } for crash in campaign.crashes.filter_by(analyzed=False)] )
def _build_bundle_url(hit: dict, replica: Replica) -> str: uuid, version = hit['_id'].split('.', 1) return request.host_url + str(UrlBuilder() .set(path='v1/bundles/' + uuid) .add_query("version", version) .add_query("replica", replica.name) )
def _build_scroll_url(_scroll_id: str, per_page: int, replica: Replica, output_format: str) -> str: return request.host_url + str(UrlBuilder() .set(path="v1/search") .add_query('per_page', str(per_page)) .add_query("replica", replica.name) .add_query("_scroll_id", _scroll_id) .add_query("output_format", output_format) )
def api_unpublish(): if request.method == 'GET': uniqueid = request.args['uniqueid'] file = File.query.filter_by(unique_id=uniqueid).first() if file is not None: if g.user.admin or (g.user.id == file.uploader_id): key = file.publickey if key is not None: file.publickey.public = False db.session.commit() url = request.host_url + "pub/dl/" + key.hash return jsonify(response=responds['PUBLIC_KEY_UNPUBLISH'], url=url) return jsonify(response=responds['SOME_ERROR'])
def api_publish(): if request.method == 'GET': uniqueid = request.args['uniqueid'] file = File.query.filter_by(unique_id=uniqueid).first() if file is not None: if g.user.admin or (g.user.id == file.uploader_id): key = file.publickey if (key is not None) and (key.public is False): file.publickey.public = True db.session.commit() url = request.host_url + "pub/dl/" + key.hash return jsonify(response=responds['PUBLIC_KEY_PUBLISH'], url=url) return jsonify(response=responds['SOME_ERROR'])
def is_safe_url(target): if not target: return False ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
def isSafeUrl(target): """ Checks URL for safety to ensure that it does not redirect unexpectedly. Args: target (str): URL for the target to test. Returns: bool: True if the URL is safe. """ ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc ###################################################
def is_safe_url(target): ref_url = urlparse.urlparse(request.host_url) test_url = urlparse.urlparse(urlparse.urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and \ ref_url.netloc == test_url.netloc
def _DL(self, src, execute_in_gid, group_config_type, executor_permission, text): regex_list = packer_factory._DL regex_result = tool.regex_finder.find_match(regex_list, text) if regex_result is None: return if regex_result.match_at == 0: package_id = regex_result.group(1) including_sound = regex_result.group(2) is not None try: sticker_meta = self._sticker_dl.get_pack_meta(package_id) except tool.MetaNotFoundException: return error.main.miscellaneous(u'???????(??ID: {})'.format(package_id)) dl_result = self._sticker_dl.download_stickers(sticker_meta, including_sound) with self._flask_app.test_request_context(): url = request.host_url ret = [u'???????????????', u'?????????????', u'LINE??????????????????????????', u'?????????gif???? https://ezgif.com/apng-to-gif', u''] ret.append(u'??ID: {}'.format(sticker_meta.pack_id)) ret.append(u'{} (? {} ??)'.format(sticker_meta.title, sticker_meta.author)) ret.append(u'') ret.append(u'??????: (??)') ret.append(u'???? {:.3f} ?'.format(dl_result.downloading_consumed_time)) ret.append(u'???? {:.3f} ?'.format(dl_result.compression_consumed_time)) ret.append(u'???? {} ?'.format(dl_result.sticker_count)) return [bot.line_api_wrapper.wrap_text_message(txt, self._webpage_generator) for txt in (u'\n'.join(ret), url + dl_result.compressed_file_path.replace("\\", "\\\\"))] else: raise RegexNotImplemented(error.sys_command.regex_not_implemented(u'DL', regex_result.match_at, regex_result.regex))
def validate_redirect_url(url): if url is None or url.strip() == '': return False url_next = urlsplit(url) url_base = urlsplit(request.host_url) if (url_next.netloc or url_next.scheme) and url_next.netloc != url_base.netloc: return False return True
def is_safe_url(target): ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) is_safe = test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc return is_safe
def try_login(self, identity_url, ask_for=None, ask_for_optional=None, extensions=None, immediate=False): """This tries to login with the given identity URL. This function must be called from the login_handler. The `ask_for` and `ask_for_optional`parameter can be a set of values to be asked from the openid provider, where keys in `ask_for` are marked as required, and keys in `ask_for_optional` are marked as optional. The following strings can be used in the `ask_for` and `ask_for_optional` parameters: ``aim``, ``blog``, ``country``, ``dob`` (date of birth), ``email``, ``fullname``, ``gender``, ``icq``, ``image``, ``jabber``, ``language``, ``msn``, ``nickname``, ``phone``, ``postcode``, ``skype``, ``timezone``, ``website``, ``yahoo`` `extensions` can be a list of instances of OpenID extension requests that should be passed on with the request. If you use this, please make sure to pass the Response classes of these extensions when initializing OpenID. `immediate` can be used to indicate this request should be a so-called checkid_immediate request, resulting in the provider not showing any UI. Note that this adds a new possible response: SetupNeeded, which is the server saying it doesn't have enough information yet to authorized or reject the authentication (probably, the user needs to sign in or approve the trust root). """ if ask_for and __debug__: for key in ask_for: if key not in ALL_KEYS: raise ValueError('invalid key %r' % key) if ask_for_optional: for key in ask_for_optional: if key not in ALL_KEYS: raise ValueError('invalid optional key %r' % key) try: consumer = Consumer(SessionWrapper(self), self.store_factory()) auth_request = consumer.begin(identity_url) if ask_for or ask_for_optional: self.attach_reg_info(auth_request, ask_for, ask_for_optional) if extensions: for extension in extensions: auth_request.addExtension(extension) except discover.DiscoveryFailure: self.signal_error(u'The OpenID was invalid') return redirect(self.get_current_url()) if self.url_root_as_trust_root: trust_root = request.url_root else: trust_root = request.host_url return redirect(auth_request.redirectURL(trust_root, self.get_success_url(), immediate=immediate))
def _upload(): if request.method == 'POST': file = request.files['files[]'] # get filename and folders file_name = secure_filename(file.filename) directory = str(unique_id()) upload_folder = myapp.config['UPLOAD_FOLDER'] if file.filename == '': return redirect(request.url) if file: #and allowed_file(file.filename) save_dir = os.path.join(upload_folder, directory) if not os.path.exists(save_dir): os.makedirs(save_dir) cmpl_path = os.path.join(save_dir, file_name) file.save(cmpl_path) size = os.stat(cmpl_path).st_size # create our file from the model and add it to the database dbfile = File(file_name, directory, size, file.mimetype) g.user.uploads.append(dbfile) db.session().add(dbfile) db.session().commit() if "image" in dbfile.mimetype: get_thumbnail(cmpl_path, "100") thumbnail_url = request.host_url + 'thumbs/' + directory else: thumbnail_url = "" url = request.host_url + 'uploads/' + directory delete_url = url delete_type = "DELETE" file = {"name": file_name, "url": url, "thumbnailUrl": thumbnail_url, "deleteUrl": delete_url, "deleteType": delete_type, "uid": directory} return jsonify(files=[file]) else: return jsonify(files=[{"name": file_name, "error": responds['FILETYPE_NOT_ALLOWED']}])