我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用flask.request.base_url()。
def review(title): """ This URL only exists for legacy reasons so try to find the article where it is in the new scheme and return 301 to indicate moved. """ branch = request.args.get('branch', u'master') article = models.search_for_article(title) if article is not None: return redirect(filters.url_for_article(article, branch=branch), 301) return missing_article(request.base_url, title=title, branch=branch) # Note this URL is directly linked to the filters.url_for_article filter. # These must be changed together!
def headerize(func): """The decorator adds header links to response for paginator""" @wraps(func) def wrapper(*args, **kwargs): data = func(*args, **kwargs) resp = jsonify(data) fmt = '<{base}?page={page}&per_page={per_page}>; rel="{rel}"' links = [{'page': data['page'] - 1, 'rel':'prev'}, {'page': data['page'] + 1, 'rel':'next'}, {'page': 1, 'rel': 'first'}, {'page': data['num_pages'], 'rel':'last'}] header = ', '.join(fmt.format(base=request.base_url, page=i['page'], per_page=data['per_page'], rel=i['rel']) for i in links) resp.headers.extend({'Link': header}) return resp return wrapper
def __init__(self, data): self._structure = data self._dispatcher = { 'get': requests.get, 'post': requests.post, 'put': requests.put, 'delete': requests.delete} self._get_system_settings() dgst = data.get('password-digest') if (dgst is not None and isinstance(dgst, basestring) and hasattr(hashlib, dgst.lower())): m = operator.methodcaller(dgst.lower(), self.billing_password)(hashlib) self.billing_password = m.hexdigest() _url = urlparse(request.base_url) self.master_url = '{0}://{1}'.format(_url.scheme, _url.netloc)
def spark_application(app_id): """Mock of the Spark jobs REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'jobId': st.integers(0), 'name': st.text(), 'submissionTime': st.text(), 'completionTime': st.text(), 'stageIds': st.lists(st.integers(0), average_size=3), 'status': st.sampled_from(['SUCCEEDED', 'RUNNING', 'FAILED']), 'numTasks': st.integers(0), 'numActiveTasks': st.integers(0), 'numCompletedTasks': st.integers(0), 'numSkippedTasks': st.integers(0), 'numFailedTasks': st.integers(0), 'numActiveStages': st.integers(0), 'numCompletedStages': st.integers(0), 'numSkippedStages': st.integers(0), 'numFailedStages': st.integers(0), }) result = json.dumps(st.lists(d, average_size=3).example()) redis.set(request.base_url, result) return jsonify(result)
def _cache_key(ui, url=None, locale=None, additional_key_data=None): if url is None: url = request.base_url if locale is None: locale = g.locale.language if g.locale else "en" k = "ui:{}:{}:{}".format(ui, url, locale) if callable(additional_key_data): try: ak = additional_key_data() if ak: # we have some additional key components, let's attach them if not isinstance(ak, (list, tuple)): ak = [ak] k = "{}:{}".format(k, ":".join(ak)) except: _logger.exception("Error while trying to retrieve additional cache key parts for ui {}".format(ui)) return k
def process_request(): """ Process request. - Set api_url """ base_url = request.base_url referrer = request.headers.get('referer') if referrer: # we use referrer as base url parts = urlparse(referrer) base_url = urlunparse((parts.scheme, parts.netloc, '', '', '', '')) elif APP_URL: base_url = APP_URL # Used in building full URIs request.api_url = urljoin(base_url, API_PREFIX + '/') request.user = flask_session.get('user') request.realm = flask_session.get('realm', 'employees')
def verify_token(token): """ Verify the supplied token and check user role is correct for the requested resource""" if not token: current_app.logger.debug(f'Token not supplied {request.base_url}') return False try: decoded_token = base64.b64decode(token).decode('utf-8') except UnicodeDecodeError: current_app.logger.debug(f'Unable to decode token {request.base_url}') return False # Can't decode token, so fail login valid_token, user_id = AuthenticationService.is_valid_token(decoded_token, 604800) if not valid_token: current_app.logger.debug(f'Token not valid {request.base_url}') return False if tm.is_pm_only_resource: if not UserService.is_user_a_project_manager(user_id): current_app.logger.debug(f'User {user_id} is not a PM {request.base_url}') return False tm.authenticated_user_id = user_id # Set the user ID on the decorator as a convenience return True # All tests passed token is good for the requested resource
def before_request(): request.start_time = datetime.now() # @bp.after_request # def after_request(resp): # try: # if '_' in request.endpoint: # dbcon = influx_db.connection # point = [{"measurement": config.APP_NAME, # "tags": {"method": request.method, "status": resp.status_code, "endpoint": # request.endpoint}, # "fields": {"base_url": request.base_url, "remote_address": request.remote_addr, # 'response_time': (datetime.now() - request.start_time).microseconds}}] # dbcon.write_points(point) # except Exception as e: # pass # logger.debug('Write api statistics data to influxdb failed, error?' + e.message) # return resp
def forbidden(message="Forbidden."): log.debug("Response 403: %s", message) log.info("Denied %s %s %s", request.remote_addr, request.method, request.base_url) return Response(message + "\n", status=403, mimetype="text/plain")
def base_url(path = None): return request.base_url + ("" if path is None else "/" + path)
def index(): return jsonify(name='Payments REST API Service', version='1.0', docs=request.base_url + 'apidocs/index.html', site=request.base_url + 'payments'), status.HTTP_200_OK ###################################################################### # LIST ALL PAYMENTS ######################################################################
def partner(article_path): """ URL for articles from hackhands blog -- these articles are not editable. """ try: repo_path = '%s/%s' % (app.config['SECONDARY_REPO_OWNER'], app.config['SECONDARY_REPO_NAME']) except KeyError: flash('No secondary guide configuration', category='error') return redirect(url_for('index')) if article_path is None: articles = models.get_available_articles(status=PUBLISHED, repo_path=repo_path) return render_template('review.html', articles=articles) article = models.read_article(article_path, repo_path=repo_path) if article is None: flash('Failed reading guide', category='error') return redirect(url_for('index')) # Use http as canonical protocol for url to avoid having two separate # comment threads for an article. Disqus uses this variable to save # comments. canonical_url = request.base_url.replace('https://', 'http://') form = forms.SignupForm() return render_template('article.html', article=article, allow_edits=False, canonical_url=canonical_url, form=form, disclaimer=True)
def _finish_span(self, response=None, exception=None): """ Close and finish the active span if it exists. """ span = getattr(g, 'flask_datadog_span', None) if span: if span.sampled: error = 0 code = response.status_code if response else None method = request.method if request else None # if we didn't get a response, but we did get an exception, set # codes accordingly. if not response and exception: code = 500 # The 3 next lines might not be strictly required, since `set_traceback` # also get the exception from the sys.exc_info (and fill the error meta). # Since we aren't sure it always work/for insuring no BC break, keep # these lines which get overridden anyway. error = 1 span.set_tag(errors.ERROR_TYPE, type(exception)) span.set_tag(errors.ERROR_MSG, exception) # The provided `exception` object doesn't have a stack trace attached, # so attach the stack trace with `set_traceback`. span.set_traceback() # the endpoint that matched the request is None if an exception # happened so we fallback to a common resource resource = code if not request.endpoint else request.endpoint span.resource = compat.to_unicode(resource).lower() span.set_tag(http.URL, compat.to_unicode(request.base_url or '')) span.set_tag(http.STATUS_CODE, code) span.set_tag(http.METHOD, method) span.error = error span.finish() # Clear our span just in case. g.flask_datadog_span = None # Request hook methods
def after_request(response): """ called after every request """ # log the endpoint hit and any errors delta = int((time.time() - g.start_time) * 1000) start_utc = datetime.datetime.utcfromtimestamp(g.start_time) username = request.authorization.username if request.authorization else None err_msg = response.get_data(as_text=True) if response.status_code // 100 >= 4 else None Logger.endpoint_hit(start_utc, delta, request.base_url, username, request.method, response.status_code, err_msg) return response
def create_link_string(page, last_page, per_page): """Returns a string representing the value of the ``Link`` header. `page` is the number of the current page, `last_page` is the last page in the pagination, and `per_page` is the number of results per page. """ linkstring = '' if page < last_page: next_page = page + 1 linkstring = LINKTEMPLATE.format(request.base_url, next_page, per_page, 'next') + ', ' linkstring += LINKTEMPLATE.format(request.base_url, last_page, per_page, 'last') return linkstring
def get_current_url(self): """the current URL + next.""" return request.base_url + '?next=' + url_quote(self.get_next_url())
def metrics(): """Mock of the YARN cluster metrics REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'activeNodes': st.integers(0), 'allocatedMB': st.integers(0), 'allocatedVirtualCores': st.integers(0), 'appsCompleted': st.integers(0), 'appsFailed': st.integers(0), 'appsKilled': st.integers(0), 'appsPending': st.integers(0), 'appsRunning': st.integers(0), 'appsSubmitted': st.integers(0), 'availableMB': st.integers(0), 'availableVirtualCores': st.integers(0), 'containersAllocated': st.integers(0), 'containersPending': st.integers(0), 'containersReserved': st.integers(0), 'decommissionedNodes': st.integers(0), 'lostNodes': st.integers(0), 'rebootedNodes': st.integers(0), 'reservedMB': st.integers(0), 'reservedVirtualCores': st.integers(0), 'totalMB': st.integers(0), 'totalNodes': st.integers(0), 'totalVirtualCores': st.integers(0), 'unhealthyNodes': st.integers(0) }) result = json.dumps({ 'clusterMetrics': d.example() }) redis.set(request.base_url, result) return jsonify(result)
def get(self, uri=None): urlbase = request.base_url return dict([(urls[i+1].__name__.split(".")[-1].lower(), urlbase + urls[i]) for i in range(len(urls))[::2]])
def home(): html = 'HERE CAN BE PHISHING PAGE FOR {}'.format(request.base_url) return html
def index(): #wishlist_url = request.base_url + 'wishlists' #return (jsonify(service='wishlists', version='0.1', # url=wishlist_url), HTTP_200_OK) return app.send_static_file('index.html')
def _preemptive_unless(base_url=None, additional_unless=None): if base_url is None: base_url = request.url_root disabled_for_root = not settings().getBoolean(["devel", "cache", "preemptive"]) \ or base_url in settings().get(["server", "preemptiveCache", "exceptions"]) \ or not (base_url.startswith("http://") or base_url.startswith("https://")) recording_disabled = request.headers.get("X-Preemptive-Record", "yes") == "no" if callable(additional_unless): return recording_disabled or disabled_for_root or additional_unless() else: return recording_disabled or disabled_for_root
def _preemptive_data(key, path=None, base_url=None, data=None, additional_request_data=None): if path is None: path = request.path if base_url is None: base_url = request.url_root d = dict(path=path, base_url=base_url, query_string="l10n={}".format(g.locale.language if g.locale else "en")) if key != "_default": d["plugin"] = key # add data if we have any if data is not None: try: if callable(data): data = data() if data: if "query_string" in data: data["query_string"] = "l10n={}&{}".format(g.locale.language, data["query_string"]) d.update(data) except: _logger.exception("Error collecting data for preemptive cache from plugin {}".format(key)) # add additional request data if we have any if callable(additional_request_data): try: ard = additional_request_data() if ard: d.update(dict( _additional_request_data=ard )) except: _logger.exception("Error retrieving additional data for preemptive cache from plugin {}".format(key)) return d
def next_url(limit=None, offset=None): limit = limit or request.values.get('limit', 30, type=int) offset = offset or request.values.get('offset', 0, type=int) values = request.values.to_dict() values['offset'] = limit + offset return request.base_url + '?' + urllib.urlencode(values)
def _get_email_validated_url(is_valid: bool) -> str: """ Helper function to generate redirect url for email verification """ base_url = current_app.config['APP_BASE_URL'] verification_params = {'is_valid': is_valid} verification_url = '{0}/validate-email?{1}'.format(base_url, urllib.parse.urlencode(verification_params)) return verification_url
def get_authentication_failed_url(): """ Generates the auth-failed URL for the running app """ base_url = current_app.config['APP_BASE_URL'] auth_failed_url = f'{base_url}/auth-failed' return auth_failed_url
def generate_authorized_url(username, session_token, redirect_to): """ Generate URL that we'll redirect the user to once authenticated """ base_url = current_app.config['APP_BASE_URL'] redirect_query = '' if redirect_to: redirect_query = f'&redirect_to={urllib.parse.quote(redirect_to)}' # Trailing & added as Angular a bit flaky with parsing querystring authorized_url = f'{base_url}/authorized?username={urllib.parse.quote(username)}&session_token={session_token}&ng=0' \ f'{redirect_query}' return authorized_url
def web_node_all(): nodes_info = BP.nodes for node in nodes_info: status = get_node_status(node.coordinate) if status: node.manifest = status['manifest'] node.status = status['status'] return render_template( _ERS_element + '_all.tpl', label=__doc__, nodes=BP.nodes, base_url=request.url)
def web_node_button_action(name=None): # Either way, name has no leading slash. if 'unbind' in request.form: delete_node_binding(name) elif 'bind' in request.form: manname = request.form['manifest_sel'] manifest = BP.manifest_lookup(manname) build_node(manifest, name) return redirect(request.base_url) # Eliminates browser caching of POST
def root(): return render_template( 'index.tpl', api_version=mainapp.config['API_VERSION'], base_url=request.base_url, mirror=mainapp.config['L4TM_MIRROR'], release=mainapp.config['L4TM_RELEASE'], rules=mainapp.config['rules'], url_root=request.url_root, coordinate=mainapp.config['tmconfig'].racks[1]['coordinate']) ########################################################################### # Networking stuff
def article(article_id): comment_form = CommentForm() if comment_form.validate_on_submit(): from MagicPress.utils.tasks import send_async_email new_comment = Comment(username=comment_form.name.data) new_comment.text = comment_form.text.data new_comment.create_time = datetime.utcnow() new_comment.site = comment_form.site.data new_comment.email = comment_form.email.data new_comment.ip = request.remote_addr new_comment.language = request.accept_languages.best new_comment.os = request.user_agent.platform new_comment.browser = request.user_agent.browser new_comment.article_id = str(request.base_url).split('/')[-1] info = get_ip_info(request.remote_addr) new_comment.location = info['country']+info['region']+info['city'] new_comment.network = info['isp'] if gfw.filter(comment_form.text.data) or gfw.filter(comment_form.name.data): new_comment.hidden = False flash(u'????????????') else: new_comment.hidden = True message_details = {} message_details['subject'] = 'New Comment' message_details['recipients'] = [current_app.config['ADMIN_EMAIL']] message_details['body'] = "Name: %s\nEmail: %s\nSite: %s\nLocation: %s\n" \ "Hihhen: %s\nText:\n\n %s" % ( comment_form.name.data, current_app.config['ADMIN_EMAIL'], comment_form.site.data, info['country']+info['region']+info['city'], str(new_comment.hidden), comment_form.text.data) send_async_email.delay(message_details) db.session.add(new_comment) db.session.commit() the_article = Article.query.filter_by(id=article_id).first() next_article = db.session.query(Article).filter(Article.id < article_id, Article.state == True).order_by(Article.id.desc()).first() pre_article = db.session.query(Article).filter(Article.id > article_id, Article.state == True).order_by(Article.id.asc()).first() comments = Comment.query.filter_by(article_id=article_id, hidden=True).all() return render_template(get_theme() + '/article.html', article=the_article, next_article=next_article, pre_article=pre_article, comment_form=comment_form, comments=comments)
def get_swaggerui_blueprint(base_url, api_url, config=None, oauth_config=None): swagger_ui = Blueprint('swagger_ui', __name__, static_folder='dist', template_folder='templates') default_config = { 'app_name': 'Swagger UI', 'dom_id': '#swagger-ui', 'url': api_url, 'layout': 'StandaloneLayout' } if config: default_config.update(config) fields = { # Some fields are used directly in template 'base_url': base_url, 'app_name': default_config.pop('app_name'), # Rest are just serialized into json string for inclusion in the .js file 'config_json': json.dumps(default_config), } if oauth_config: fields['oauth_config_json'] = json.dumps(oauth_config) @swagger_ui.route('/') @swagger_ui.route('/<path:path>') def show(path=None): if not path or path == 'index.html': if not default_config.get('oauth2RedirectUrl', None): default_config.update( {"oauth2RedirectUrl": "%s/oauth2-redirect.html" % request.base_url} ) fields['config_json'] = json.dumps(default_config) return render_template('index.template.html', **fields) else: return send_from_directory( # A bit of a hack to not pollute the default /static path with our files. os.path.join( swagger_ui.root_path, swagger_ui._static_folder ), path ) return swagger_ui
def applications(): """Mock of the YARN cluster apps REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'allocatedMB': st.integers(-1), 'allocatedVCores': st.integers(-1), 'amContainerLogs': st.text(), 'amHostHttpAddress': st.text(), 'applicationTags': st.text(), 'applicationType': st.sampled_from(['MAPREDUCE', 'SPARK']), 'clusterId': st.integers(0), 'diagnostics': st.text(), 'elapsedTime': st.integers(0), 'finalStatus': st.sampled_from(['UNDEFINED', 'SUCCEEDED', 'FAILED', 'KILLED']), 'finishedTime': st.integers(0), 'id': st.text(string.ascii_letters, min_size=5, max_size=25), 'memorySeconds': st.integers(0), 'name': st.text(min_size=5), 'numAMContainerPreempted': st.integers(0), 'numNonAMContainerPreempted': st.integers(0), 'preemptedResourceMB': st.integers(0), 'preemptedResourceVCores': st.integers(0), 'progress': st.floats(0, 100), 'queue': st.text(), 'runningContainers': st.integers(-1), 'startedTime': st.integers(0), 'state': st.sampled_from(['NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED', 'RUNNING', 'FINISHED', 'FAILED', 'KILLED']), 'trackingUI': st.text(), 'trackingUrl': st.just(os.environ['YARN_ENDPOINT']), 'user': st.text(), 'vcoreSeconds': st.integers(0) }) result = json.dumps({ 'apps': { 'app': st.lists(d, min_size=4, average_size=10).example() } }) redis.set(request.base_url, result) return jsonify(result)
def mapreduce_application(): """Mock of the mapreduce jobs REST resource.""" if 'last' in request.args: return jsonify(redis.get(request.base_url)) d = st.fixed_dictionaries({ 'startTime': st.integers(0), 'finishTime': st.integers(0), 'elapsedTime': st.integers(0), 'id': st.integers(0), 'name': st.text(), 'user': st.text(), 'state': st.sampled_from(['NEW', 'SUCCEEDED', 'RUNNING', 'FAILED', 'KILLED']), 'mapsTotal': st.integers(0), 'mapsCompleted': st.integers(0), 'reducesTotal': st.integers(0), 'reducesCompleted': st.integers(0), 'mapProgress': st.floats(0, 100), 'reduceProgress': st.floats(0, 100), 'mapsPending': st.integers(0), 'mapsRunning': st.integers(0), 'reducesPending': st.integers(0), 'reducesRunning': st.integers(0), 'uberized': st.booleans(), 'diagnostics': st.text(), 'newReduceAttempts': st.integers(0), 'runningReduceAttempts': st.integers(0), 'failedReduceAttempts': st.integers(0), 'killedReduceAttempts': st.integers(0), 'successfulReduceAttempts': st.integers(0), 'newMapAttempts': st.integers(0), 'runningMapAttempts': st.integers(0), 'failedMapAttempts': st.integers(0), 'killedMapAttempts': st.integers(0), 'successfulMapAttempts': st.integers(0) }) result = json.dumps({ 'jobs': { 'job': st.lists(d, average_size=3).example() } }) redis.set(request.base_url, result) return jsonify(result)
def list(select, request): '''Returns a styled list from the data passed. The pagination style is defined in the request data. The default values of `number` and `page` will be 10 and 1, respectively. If these parameters are passed as data in the request, then the values will be updated accordingly. Keyword arguments: select -- A database query of data. request -- A request of some type. ''' number = 10 page = 1 for key in request.values: if key == 'number': number = int(request.values.get('number')) elif key == 'page': page = int(request.values.get('page')) '''Call peewee paginate method on the query.''' arr = [] for i in select.paginate(page, number): arr.append(i.to_dict()) '''By default, `next_page_path` and `prev_page_path` are None.''' next_page_path = None prev_page_path = None base_path = request.base_url + "?page=" end_path = "&number=" + str(number) '''Update `next_page_path` and `prev_page_path` if there is data on either a next or previous page from the pagination. ''' if len(arr) == number: next_page_path = base_path + str(page + 1) + end_path if page > 1: prev_page_path = base_path + str(page - 1) + end_path '''Return an array of dicts, containing the data and pagination.''' data = [dict(data=arr)] data.append(dict(paging=dict(next=next_page_path, previous=prev_page_path))) return data
def in_cache(): url = request.base_url.replace("/cached.gif", "/") path = request.path.replace("/cached.gif", "/") base_url = request.url_root # select view from plugins and fall back on default view if no plugin will handle it ui_plugins = pluginManager.get_implementations(octoprint.plugin.UiPlugin, sorting_context="UiPlugin.on_ui_render") for plugin in ui_plugins: if plugin.will_handle_ui(request): ui = plugin._identifier key = _cache_key(plugin._identifier, url=url, additional_key_data=plugin.get_ui_additional_key_data_for_cache) unless = _preemptive_unless(url, additional_unless=plugin.get_ui_preemptive_caching_additional_unless) data = _preemptive_data(plugin._identifier, path=path, base_url=base_url, data=plugin.get_ui_data_for_preemptive_caching, additional_request_data=plugin.get_ui_additional_request_data_for_preemptive_caching) break else: ui = "_default" key = _cache_key("_default", url=url) unless = _preemptive_unless(url) data = _preemptive_data("_default", path=path, base_url=base_url) response = make_response(bytes(base64.b64decode("R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7"))) response.headers["Content-Type"] = "image/gif" if unless or not preemptiveCache.has_record(data, root=path): _logger.info("Preemptive cache not active for path {}, ui {} and data {!r}, signaling as cached".format(path, ui, data)) return response elif util.flask.is_in_cache(key): _logger.info("Found path {} in cache (key: {}), signaling as cached".format(path, key)) return response elif util.flask.is_cache_bypassed(key): _logger.info("Path {} was bypassed from cache (key: {}), signaling as cached".format(path, key)) return response else: _logger.debug("Path {} not yet cached (key: {}), signaling as missing".format(path, key)) return abort(404)
def main_page(): """Renders the main page. When this page is shown, we create a new channel to push asynchronous updates to the client.""" user = users.get_current_user() game_key = request.args.get('g') if not game_key: game_key = user.user_id() game = Game(id=game_key, userX=user, moveX=True, board=' '*9) game.put() else: game = Game.get_by_id(game_key) if not game: return 'No such game', 404 if not game.userO: game.userO = user game.put() # [START pass_token] # choose a unique identifier for channel_id channel_id = user.user_id() + game_key # encrypt the channel_id and send it as a custom token to the # client # Firebase's data security rules will be able to decrypt the # token and prevent unauthorized access client_auth_token = create_custom_token(channel_id) _send_firebase_message(channel_id, message=game.to_json()) # game_link is a url that you can open in another browser to play # against this player game_link = '{}?g={}'.format(request.base_url, game_key) # push all the data to the html template so the client will # have access template_values = { 'token': client_auth_token, 'channel_id': channel_id, 'me': user.user_id(), 'game_key': game_key, 'game_link': game_link, 'initial_message': urllib.unquote(game.to_json()) } return flask.render_template('fire_index.html', **template_values) # [END pass_token]
def add_pillar_request_to_notification(notification): """Adds request metadata to the Bugsnag notifications. This basically copies bugsnag.flask.add_flask_request_to_notification, but is altered to include Pillar-specific metadata. """ from flask import request, session from bugsnag.wsgi import request_path import pillar.auth if not request: return notification.context = "%s %s" % (request.method, request_path(request.environ)) if 'id' not in notification.user: user: pillar.auth.UserClass = pillar.auth.current_user._get_current_object() notification.set_user(id=user.user_id, email=user.email, name=user.username) notification.user['roles'] = sorted(user.roles) notification.user['capabilities'] = sorted(user.capabilities) session_dict = dict(session) for key in SESSION_KEYS_TO_REMOVE: try: del session_dict[key] except KeyError: pass notification.add_tab("session", session_dict) notification.add_tab("environment", dict(request.environ)) remote_addr = request.remote_addr forwarded_for = request.headers.get('X-Forwarded-For') if forwarded_for: remote_addr = f'{forwarded_for} (proxied via {remote_addr})' notification.add_tab("request", { "method": request.method, "url": request.base_url, "headers": dict(request.headers), "params": dict(request.form), "data": {'request.data': request.data, 'request.json': request.get_json()}, "endpoint": request.endpoint, "remote_addr": remote_addr, })
def web_node_status(name=None): '''name will never have a leading / but now always needs one.''' name = '/' + name try: node = BP.nodes[name][0] ESPURL = None # testable value in Jinja2 ESPsizeMB = 0 installsh = installlog = None status = get_node_status(name) if status is not None: if status['status'] == 'ready': ESPpath = '%s/%s/%s.ESP' % ( BP.config['TFTP_IMAGES'], node.hostname, node.hostname) if os.path.isfile(ESPpath): prefix = request.url.split(_ERS_element)[0] ESPURL = '%s%s/ESP/%s' % ( prefix, _ERS_element, node.hostname) ESPsizeMB = os.stat(ESPpath).st_size >> 20 if status['status'] in ('building', 'ready'): installpath = '%s/%s/untar/root' % ( BP.config['FILESYSTEM_IMAGES'], node.hostname) try: with open(installpath + '/install.sh') as f: installsh = f.read() with open(installpath + '/install.log') as f: installlog = f.read() except Exception as e: installsh = installlog = None pass # all manifests' names with namespace manifests = sorted(BP.blueprints['manifest'].get_all()) return render_template( _ERS_element + '.tpl', label=__doc__, node=node, manifests=manifests, status=status, base_url=request.url.split(name)[0], ESPURL=ESPURL, ESPsizeMB=ESPsizeMB, installsh=installsh, installlog=installlog ) except Exception as e: return make_response('Kaboom: %s' % str(e), 404)