我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.url()。
def bookmarklet_js(): base_url = request.url.replace( "browser-tools/bookmarklet.js", "static/browser-tools/" ) if "localhost:" not in base_url: # seems like this shouldn't be necessary. but i think # flask's request.url is coming in with http even when # we asked for https on the server. weird. base_url = base_url.replace("http://", "https://") rendered = render_template( "browser-tools/bookmarklet.js", base_url=base_url ) resp = make_response(rendered, 200) resp.mimetype = "application/javascript" return resp
def showTimeMap(urir, format): urir = getCompleteURI(urir) s = surt.surt(urir, path_strip_trailing_slash_unless_empty=False) indexPath = ipwbConfig.getIPWBReplayIndexPath() cdxjLinesWithURIR = getCDXJLinesWithURIR(urir, indexPath) tmContentType = '' if format == 'link': tm = generateLinkTimeMapFromCDXJLines( cdxjLinesWithURIR, s, request.url) tmContentType = 'application/link-format' elif format == 'cdxj': tm = generateCDXJTimeMapFromCDXJLines( cdxjLinesWithURIR, s, request.url) tmContentType = 'application/cdxj+ors' resp = Response(tm) resp.headers['Content-Type'] = tmContentType return resp
def update_content_in_local_cache(url, content, method='GET'): """?? local_cache ??????, ??content ?stream?????""" if local_cache_enable and method == 'GET' and cache.is_cached(url): info_dict = cache.get_info(url) resp = cache.get_obj(url) resp.set_data(content) # ???????????content?, without_content ????true # ?????????, ???content????, ???????? # ?stream???, ??????http?, ???????, ???????????????? # ?????????????????????, ??????????????? info_dict['without_content'] = False if verbose_level >= 4: dbgprint('LocalCache_UpdateCache', url, content[:30], len(content)) cache.put_obj( url, resp, obj_size=len(content), expires=get_expire_from_mime(parse.mime), last_modified=info_dict.get('last_modified'), info_dict=info_dict, )
def extract_url_path_and_query(full_url=None, no_query=False): """ Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y :param no_query: :type full_url: str :param full_url: full url :return: str """ if full_url is None: full_url = request.url split = urlsplit(full_url) result = split.path or "/" if not no_query and split.query: result += '?' + split.query return result # ################# End Client Request Handler ################# # ################# Begin Middle Functions #################
def request_remote_site(): """ ???????(high-level), ????404/500??? domain_guess ?? """ # ???????? # ??: ?zmirror?????????, ?????????????? parse.remote_response = send_request( parse.remote_url, method=request.method, headers=parse.client_header, data=parse.request_data_encoded, ) if parse.remote_response.url != parse.remote_url: warnprint("requests's remote url", parse.remote_response.url, 'does no equals our rewrited url', parse.remote_url) if 400 <= parse.remote_response.status_code <= 599: # ??url???????? dbgprint("Domain guessing for", request.url) result = guess_correct_domain() if result is not None: parse.remote_response = result
def advisory_atom(): last_recent_entries = 15 data = get_advisory_data()['published'][:last_recent_entries] feed = AtomFeed('Arch Linux Security - Recent advisories', feed_url=request.url, url=request.url_root) for entry in data: advisory = entry['advisory'] package = entry['package'] title = '[{}] {}: {}'.format(advisory.id, package.pkgname, advisory.advisory_type) feed.add(title=title, content=render_template('feed.html', content=advisory.content), content_type='html', summary=render_template('feed.html', content=advisory.impact), summary_tpe='html', author='Arch Linux Security Team', url=TRACKER_ISSUE_URL.format(advisory.id), published=advisory.created, updated=advisory.created) return feed.get_response()
def predict(): import ipdb; ipdb.set_trace(context=20) if 'file' not in request.files: flash('No file part') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): filename = secure_filename(file.filename) try: pokemon_name = predict_mlp(file).capitalize() pokemon_desc = pokemon_entries.get(pokemon_name) msg = "" except Exception as e: pokemon_name = None pokemon_desc = None msg = str(e) return jsonify({'name': pokemon_name, 'description': pokemon_desc, "msg": msg})
def rss_feed(): feed = AtomFeed('White House Briefing Room Releases', feed_url=request.url, url=request.url_root) documents = WhiteHouse.query.order_by(WhiteHouse.document_date.desc()) for document in documents: feed.add(document.title, document.tweet, content_type='text', author="@presproject2017", url=make_external(document.full_url), updated=document.document_date, published=document.document_date) return feed.get_response()
def retrieveAlertsCountWithType(): """ Retrieve number of alerts in timeframe (GET-Parameter time as decimal or "day") and divide into honypot types""" # get result from cache getCacheResult = getCache(request.url, "url") if getCacheResult is not False: return jsonify(getCacheResult) # query ES else: # Retrieve Number of Alerts from ElasticSearch and return as xml / json if not request.args.get('time'): app.logger.error('No time GET-parameter supplied in retrieveAlertsCountWithType. Must be decimal number (in minutes) or string "day"') return app.config['DEFAULTRESPONSE'] else: returnResult = formatAlertsCountWithType(queryAlertsCountWithType(request.args.get('time'), checkCommunityIndex(request))) setCache(request.url, returnResult, 13, "url") app.logger.debug('UNCACHED %s' % str(request.url)) return jsonify(returnResult)
def retrieveDatasetAlertsPerMonth(): """ Retrieve the attacks / day in the last x days from elasticsearch and return as JSON for the last months, defaults to last month, if no GET parameter days is given """ # get result from cache getCacheResult = getCache(request.url, "url") if getCacheResult is not False: return jsonify(getCacheResult) # query ES else: if not request.args.get('days'): # Using default : within the last month returnResult = formatDatasetAlertsPerMonth(queryDatasetAlertsPerMonth(None, checkCommunityIndex(request))) else: returnResult = formatDatasetAlertsPerMonth(queryDatasetAlertsPerMonth(request.args.get('days'), checkCommunityIndex(request))) setCache(request.url, returnResult, 600, "url") return jsonify(returnResult)
def retrieveDatasetAlertTypesPerMonth(): """ Retrieve the attacks / day in the last x days from elasticsearch, split by attack group and return as JSON for the last x months, defaults to last month, if no GET parameter days is given """ # get result from cache getCacheResult = getCache(request.url, "url") if getCacheResult is not False: return jsonify(getCacheResult) # query ES else: if not request.args.get('days'): # Using default : within the last month returnResult = formatDatasetAlertTypesPerMonth(queryDatasetAlertTypesPerMonth(None, checkCommunityIndex(request))) else: returnResult = formatDatasetAlertTypesPerMonth(queryDatasetAlertTypesPerMonth(request.args.get('days'), checkCommunityIndex(request))) setCache(request.url, returnResult, 3600, "url") return jsonify(returnResult)
def retrieveAlertStats(): """ Retrieve combined statistics AlertsLastMinute, AlertsLastHour, AlertsLast24Hours """ # get result from cache getCacheResult = getCache(request.url, "url") if getCacheResult is not False: return jsonify(getCacheResult) # query ES else: returnResult = formatAlertStats(queryAlertStats(checkCommunityIndex(request))) setCache(request.url, returnResult, 13, "url") app.logger.debug('UNCACHED %s' % str(request.url)) return jsonify(returnResult)
def get_inbox(): pyldnlog.debug("Requested inbox data of {} in {}".format(request.url, request.headers['Accept'])) if not request.headers['Accept'] or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']: resp = make_response(inbox_graph.serialize(format='application/ld+json')) resp.headers['Content-Type'] = 'application/ld+json' elif request.headers['Accept'] in ACCEPTED_TYPES: resp = make_response(inbox_graph.serialize(format=request.headers['Accept'])) resp.headers['Content-Type'] = request.headers['Accept'] else: return 'Requested format unavailable', 415 resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn' resp.headers['Allow'] = "GET, HEAD, OPTIONS, POST" resp.headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type", <http://www.w3.org/ns/ldp#RDFSource>; rel="type", <http://www.w3.org/ns/ldp#Container>; rel="type", <http://www.w3.org/ns/ldp#BasicContainer>; rel="type"' resp.headers['Accept-Post'] = 'application/ld+json, text/turtle' return resp
def get_notification(id): pyldnlog.debug("Requested notification data of {}".format(request.url)) pyldnlog.debug("Headers: {}".format(request.headers)) # Check if the named graph exists pyldnlog.debug("Dict key is {}".format(pyldnconf._inbox_url + id)) if pyldnconf._inbox_url + id not in graphs: return 'Requested notification does not exist', 404 if 'Accept' not in request.headers or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']: resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format='application/ld+json')) resp.headers['Content-Type'] = 'application/ld+json' elif request.headers['Accept'] in ACCEPTED_TYPES: resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format=request.headers['Accept'])) resp.headers['Content-Type'] = request.headers['Accept'] else: return 'Requested format unavailable', 415 resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn' resp.headers['Allow'] = "GET" return resp
def register_extension(app): @app.before_request def add_correlation_id(*args, **kw): correlation_id = request.headers.get(CORRELATION_ID) log.debug("%s %s", request.method, request.url) if not correlation_id: correlation_id = str(uuid.uuid4()) if request.method != "GET": """ TODO: remove sensitive information such as username/password """ log.debug({ "message": "Tracking request", "correlation_id": correlation_id, "method": request.method, "uri": request.url, "data": request.data, }) request.correlation_id = correlation_id @app.after_request def save_correlation_id(response): if CORRELATION_ID not in response.headers: response.headers[CORRELATION_ID] = getattr(request, "correlation_id", None) return response
def _provide_client_handler(self, section, name, kwargs_updator=None): def _wrapper(func): @wraps(func) def _handler(**kwargs): client_key = self.auth.authenticate( request.method, request.url, request.headers) client = self.client_class.load(client_key) if not client: abort(401) g.ac_client = client kwargs['client'] = client if kwargs_updator: kwargs.update(kwargs_updator(**kwargs)) ret = func(**kwargs) if ret is not None: return ret return '', 204 self._add_handler(section, name, _handler) return func return _wrapper
def error_mail(subject, data, r, via_web=True): body = ''' remote URL: {r.url} status code: {r.status_code} request data: {data} status code: {r.status_code} content-type: {r.headers[content-type]} reply: {r.text} '''.format(r=r, data=data) if not has_request_context(): via_web = False if via_web: user = get_username() body = 'site URL: {}\nuser: {}\n'.format(request.url, user) + body send_mail(subject, body)
def announce_change(change): place = change.place body = ''' user: {change.user.username} name: {name} page: {url} items: {change.update_count} comment: {change.comment} https://www.openstreetmap.org/changeset/{change.id} '''.format(name=place.display_name, url=place.candidates_url(_external=True), change=change) send_mail('tags added: {}'.format(place.name_for_changeset), body)
def open_changeset_error(place, changeset, r): template = ''' user: {change.user.username} name: {name} page: {url} sent: {sent} reply: {reply} ''' body = template.format(name=place.display_name, url=place.candidates_url(_external=True), sent=changeset, reply=r.text) send_mail('error creating changeset:' + place.name, body)
def log_exception(self, exc_info): self.logger.error(""" Path: %s HTTP Method: %s Client IP Address: %s User Agent: %s User Platform: %s User Browser: %s User Browser Version: %s GET args: %s view args: %s URL: %s """ % ( request.path, request.method, request.remote_addr, request.user_agent.string, request.user_agent.platform, request.user_agent.browser, request.user_agent.version, dict(request.args), request.view_args, request.url ), exc_info=exc_info)
def authenticated(fn): """Mark a route as requiring authentication.""" @wraps(fn) def decorated_function(*args, **kwargs): if not session.get('is_authenticated'): return redirect(url_for('login', next=request.url)) if request.path == '/logout': return fn(*args, **kwargs) if (not session.get('name') or not session.get('email') or not session.get('institution')) and request.path != '/profile': return redirect(url_for('profile', next=request.url)) return fn(*args, **kwargs) return decorated_function
def login_required(func): """ Decorator to require login and save URL for redirecting user after login """ @wraps(func) def decorated_function(*args, **kwargs): """decorator args""" if not is_logged_in(): # Save off the page so we can redirect them to what they were # trying to view after logging in. session['previously_requested_page'] = request.url return redirect(url_for('login')) return func(*args, **kwargs) return decorated_function
def set_featured_title(): """Form POST to update featured title""" title = request.form['title'] stack = request.form['stack'] article = models.search_for_article(title, stacks=[stack], status=PUBLISHED) if article is None: flash('Cannot find published guide "%s" stack "%s"' % (title, stack), category='error') url = session.pop('previously_requested_page', None) if url is None: url = url_for('index') return redirect(url) models.set_featured_article(article) flash('Featured guide updated', category='info') return redirect(url_for('index'))
def get_social_redirect_url(article, share_domain): """ Get social redirect url for po.st to enable all counts to follow us regardless of where we're hosted. """ # Strip of trailing / to avoid having two slashes together in resulting url if share_domain.endswith('/'): share_domain = share_domain[:-1] redirect_url = filters.url_for_article(article) # Use full domain for redirect_url b/c this controls the po.st social # sharing numbers. We want these numbers to stick with the domain # we're running on so counts go with us. url = url_for_domain(redirect_url, domain=share_domain) return strip_subfolder(url)
def strip_subfolder(url): """ Strip off the subfolder if it exists so we always use the exact same share url for saving counts. """ subfolder = app.config.get('SUBFOLDER', None) if not subfolder: return url p = urlparse.urlparse(url) if not p.path.startswith(subfolder): return url new_path = p.path.replace('%s' % (subfolder), '', 1) new_url = urlparse.ParseResult(p.scheme, p.netloc, new_path, p.params, p.query, p.fragment) return new_url.geturl()
def require_login_frontend(only_if=True): """ Same logic as the API require_login, but this decorator is intended for use for frontend interfaces. It returns a redirect to the login page, along with a post-login redirect_url as a GET parameter. :param only_if: Optionally specify a boolean condition that needs to be true for the frontend login to be required. This is semantically equivalent to "require login for this view endpoint only if <condition>, otherwise, no login is required" """ def decorator(func): @wraps(func) def decorated_view(*args, **kwargs): if not current_user.is_authenticated and only_if: return redirect(UserLoginInterfaceURI.uri(redirect_url=quote(request.url, safe=''))) return func(*args, **kwargs) return decorated_view return decorator
def curl(): form = MyForm.MyForm_input() if form.submit.data: urls = form.text.data.strip().splitlines() urls = set(urls) for url in urls: Purge = purge.Purged() if not url or url.startswith('#'): continue else: url = url.strip() if not url.startswith('http'): flash('url begin with http(s)://') return render_template('Message_static.html',Main_Infos=g.main_infos) url_rep=Purge.purge_cdn(url) flash(url+' purge CDN '+url_rep) return render_template('Message_static.html',Main_Infos=g.main_infos) return render_template('cdn.html',form=form,Main_Infos=g.main_infos)
def chart_center_traffic(): try: Tra_cli_url_minute_datas = collections.OrderedDict() Tra_ser_url_minute_datas = collections.OrderedDict() for i in range(1,5): Tm = datetime.datetime.now() - datetime.timedelta(minutes=i) Tm = Tm.strftime('%H:%M') Tra_cli_url_minute_Key = 'traffic.cli.url_%s' % Tm Tra_ser_url_minute_Key = 'traffic.ser.url_%s' % Tm Tra_cli_url_minute_datas[Tm] = [[str(url), int(RC.zscore(Tra_cli_url_minute_Key, url)) * 8 / 1024 / 1024] for url in RC.zrevrange(Tra_cli_url_minute_Key, 0,4)] Tra_ser_url_minute_datas[Tm] = [[str(url), int(RC.zscore(Tra_ser_url_minute_Key,url)) * 8 / 1024 / 1024] for url in RC.zrevrange(Tra_ser_url_minute_Key, 0,4) ] return render_template('chart_center_traffic.html',Main_Infos=g.main_infos,Tra_cli_url_minute_datas=Tra_cli_url_minute_datas,Tra_ser_url_minute_datas=Tra_ser_url_minute_datas) except Exception as e: logging.error(e) flash('??????!') return render_template('Message_static.html', Main_Infos=g.main_infos)
def gateway_domain(): try: DATA = [eval(v) for v in RC.lrange('top_url_%s'%time.strftime('%Y-%m-%d',time.localtime()), 0, -1)] TOP_URL_DATA = [{'data': DATA, 'name': 'conn'}] values = collections.OrderedDict() for k in range(1,7): td = datetime.datetime.now()-datetime.timedelta(minutes=k) tt = td.strftime("%H:%M") tm = td.strftime('%Y%m%d%H%M') tables = ('????','????') httpry_Key = 'httpry_domain.%s' % tm values[tt] = [[url,int(RC.zscore(httpry_Key, url))] for url in RC.zrevrange(httpry_Key, 0, 10)] return render_template('gateway_domain.html',Main_Infos=g.main_infos,tables = tables,values=values,TOP_URL_DATA=TOP_URL_DATA) except Exception as e: logging.error(e) flash('??????!') return render_template('Message_static.html', Main_Infos=g.main_infos)
def backup_mysql_results(): produce.Async_log(g.user, request.url) try: if Redis.exists('finish_backup'): Infos = Redis.lrange('finish_backup',0,-1) if Infos: Infos = [eval(info) for info in set(Infos)] tt = time.strftime('%Y-%m-%d', time.localtime()) tables = ('??','?????','MYSQL???','?????',' ??') return render_template('backup_mysql_results.html',Main_Infos=g.main_infos,Infos=Infos,tt=tt,tables=tables) else: raise flash('????:?????????!') else: raise flash('????:?????????!') except Exception as e: if 'old' not in str(e): flash(str(e)) return render_template('Message_static.html',Main_Infos=g.main_infos)
def get_proxy_request_url(thisAuth, thisContainer=None, thisObject=None): """ create the url under which this API proxy will reach its swift back end. basically this is the request url with a different hostname :param thisAuth: :param thisContainer: :param thisObject: :return: """ u = configuration.swift_store_url.format(thisAuth) if thisContainer: u += "/" + thisContainer if thisObject: u += "/" + thisObject return u ############################################################################## # Frontend Pool ##############################################################################
def handle_auth(): """ Forward the auth request to swift replace the given storage url with our own: 'X-Storage-Url': 'http://192.168.209.204:8080/v1/AUTH_test' becomes 'X-Storage-Url': 'http://localhost:4000/v1/AUTH_test' this is the first request any client makes; we passed on an auth-token from swift which is used in further requests :return: """ clientHeaders = request.headers swiftStatus, swiftHeaders, swiftBody = httpBackend.doAuthGetToken(reqHead=clientHeaders, method="GET") log.debug("swift response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody)) if 200 == swiftStatus: replaceStorageUrl(swiftResponse=swiftHeaders) log.debug("proxy response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody)) return Response(status=swiftStatus, headers=swiftHeaders, response=swiftBody)
def __init__(self, **kwargs): """Initialises a new ``Self`` link instance. Accepts the same Keyword Arguments as :class:`.Link`. Additional Keyword Args: external (bool): if true, force link to be fully-qualified URL, defaults to False See Also: :class:`.Link` """ url = request.url external = kwargs.get('external', False) if not external and current_app.config['SERVER_NAME'] is None: url = request.url.replace(request.host_url, '/') return super(Self, self).__init__('self', url, **kwargs)
def feed(): searches = query( ''' SELECT * FROM searches WHERE published IS NOT NULL ORDER BY id DESC ''', json=True) site_url = 'http://' + app.config['HOSTNAME'] feed_url = site_url + '/feed/' def add_url(s): s['url'] = site_url + '/summary/' + s['date_path'] + '/' return s searches = map(_date_format, searches) searches = list(map(add_url, searches)) resp = make_response( render_template( 'feed.xml', updated=searches[0]['created'], site_url=site_url, feed_url=feed_url, searches=searches ) ) resp.headers['Content-Type'] = 'application/atom+xml' return resp
def parse_post(post, external_links=False, create_html=True): with open(os.path.join(BLOG_CONTENT_DIR, post)) as handle: raw = handle.read() frontmatter, content = REGEX_SPLIT_FRONTMATTER.split(raw, 2) data = yaml.load(frontmatter) y, m, d, slug = post[:-3].split('-', maxsplit=3) if create_html: data['html'] = markdown.markdown(content, extensions=[ 'markdown.extensions.extra', 'markdown.extensions.codehilite', 'markdown.extensions.toc' ]) data['url'] = url_for('blog_post', y=y, m=m, d=d, slug=slug, _external=external_links) data['reading_time'] = reading_time(content) return data
def get_feed(): from mhn.common.clio import Clio from mhn.auth import current_user authfeed = mhn.config['FEED_AUTH_REQUIRED'] if authfeed and not current_user.is_authenticated(): abort(404) feed = AtomFeed('MHN HpFeeds Report', feed_url=request.url, url=request.url_root) sessions = Clio().session.get(options={'limit': 1000}) for s in sessions: feedtext = u'Sensor "{identifier}" ' feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.' feedtext = feedtext.format(**s.to_dict()) feed.add('Feed', feedtext, content_type='text', published=s.timestamp, updated=s.timestamp, url=makeurl(url_for('api.get_session', session_id=str(s._id)))) return feed
def after_request_log(response): name = dns_resolve(request.remote_addr) current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status} Request: {method} {path} Version: {http} Status: {status} Url: {url} IP: {ip} Hostname: {host} Agent: {agent_platform} | {agent_browser} | {agent_browser_version} Raw Agent: {agent} """.format(method=request.method, path=request.path, url=request.url, ip=request.remote_addr, host=name if name is not None else '?', agent_platform=request.user_agent.platform, agent_browser=request.user_agent.browser, agent_browser_version=request.user_agent.version, agent=request.user_agent.string, http=request.environ.get('SERVER_PROTOCOL'), status=response.status)) return response
def get(self): url = request.args.get('url') tags = request.args.getlist('tag') filters = [db.Bookmark.user == current_user.id] if url is not None: filters.append(db.Bookmark.url == urldefrag(url).url) filters.append(db.Bookmark.tags.contains(tags)) result = db.Bookmark.query.filter(*filters) \ .order_by( db.Bookmark.read.desc().nullsfirst(), db.Bookmark.timestamp.desc()) \ .paginate() headers = {} links = [] if result.has_next: last_url = update_query(request.url, {'page': result.pages}) links.append(lh.Link(last_url, rel='last')) if links: headers['Link'] = lh.format_links(links) return list(map(lambda x: x.to_dict(), result.items)), 200, headers
def add_entry(): """Add a new entry to the bibliography.""" form = BiblioForm() if form.validate_on_submit(): bib_entry = BiblioEntry(ID=form.ID.data, ENTRYTYPE=form.typ.data, authors=form.author.data, title=form.title.data, year=form.year.data, school="", publisher="", keywords=form.keywords.data, url=form.url.data, journal=form.journal.data) db.session.add(bib_entry) user = current_user.name event = Event(author=user, article=form.ID.data, event="ADD", time=time.time()) db.session.add(event) db.session.commit() return redirect("/biblio") return redirect("/biblio")
def apidocs(): url = urlparse(request.url) if ":" in url.netloc: host, port = url.netloc.split(":") else: host = url.netloc port = "80" base_path = url.path.replace('/apidocs','') if url.path != "/apidocs" else "/" schemes = [url.scheme] other_scheme = "https" if url.scheme is "http" else "http" try: if request.get(other_scheme+"://"+url.netloc+url.path.replace('/apidocs','')+"/scheme").status_code is 200: schemes += [other_scheme] except: pass r = make_response(swagger.json(schemes, host, port, base_path)) r.mimetype = 'application/json' return r # return send_from_directory("www","swagger.json")
def tensorboard(logdir): port = _tensorboard_dirs.get(logdir) if not port: sock = socket.socket(socket.AF_INET) sock.bind(('', 0)) port = sock.getsockname()[1] sock.close() subprocess.Popen([ 'tensorboard', '--logdir=' + logdir, '--port=' + str(port)]) time.sleep(5) # wait for tensorboard to spin up _tensorboard_dirs[logdir] = port redirect_url = 'http://{}:{}'.format( six.moves.urllib.parse.urlparse(request.url).hostname, port) logger.debug('Redirecting to ' + redirect_url) return redirect(redirect_url)
def make_error_page(app, name, code, sentry=None, data=None, exception=None): ''' creates the error page dictionary for web errors ''' shortname = name.lower().replace(' ', '_') error = {} error['title'] = 'Marvin | {0}'.format(name) error['page'] = request.url error['event_id'] = g.get('sentry_event_id', None) error['data'] = data error['name'] = name error['code'] = code error['message'] = exception.description if exception and hasattr(exception, 'description') else None if app.config['USE_SENTRY'] and sentry: error['public_dsn'] = sentry.client.get_public_dsn('https') app.logger.error('{0} Exception {1}'.format(name, error)) return render_template('errors/{0}.html'.format(shortname), **error), code # ---------------- # Error Handling # ----------------
def weibo_nearbytimeline_wrapper(): try: data = r.get(request.url) if data is None: data = weibo_service.get_all_weibo_nearby_async( request.args["lat"], request.args["lng"], int(request.args["starttime"]), int(request.args["endtime"]), int(request.args["range"]) ) data = json.dumps(weibo_service.nearby_weibo_statis_wrapper(data)) r.set(request.url, data) return data except: traceback.print_exc() abort(404)
def get_user_flow_to_html(): try: # data = None data = r.get(request.url) if data is None: if "ring_str" in request.args: ring_str = request.args["ring_str"] else: ring_str = None data = hotel_data_service.get_user_flow_to_html( request.args["hotel_name"], request.args["baseinfo_id"].encode("utf-8"), int(request.args["page"]), ring_str=ring_str ) data = json.dumps(data) r.set(request.url, data) return data except: traceback.print_exc() abort(404)
def query_floorstate(): try: result = r.get(request.url) #result = None if result is None: result = pms_service.query_floorstate( request.args['floornum'].encode('utf-8'), request.args['time'].encode('utf-8') ) result = json.dumps(result,ensure_ascii=False) #print result r.set(request.url,result) return result except Exception,e: print e traceback.print_exc() abort(404)
def callback(): """ Step 3: Retrieving an access token. The user has been redirected back from the provider to your registered callback URL. With this redirection comes an authorization code included in the redirect URL. We will use that to obtain an access token. """ mautic = OAuth2Session(client_id, redirect_uri=redirect_uri, state=session['oauth_state']) token = mautic.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url) # We use the session as a simple DB for this example. session['oauth_token'] = token update_token_tempfile(token) # store token in /tmp/mautic_creds.json return redirect(url_for('.menu'))
def paths(self): """ :class:`dict`: The top level :swagger:`pathsObject`. """ paths = {} for rule in self.app.url_map.iter_rules(): if rule.endpoint == 'static': continue log.info('Processing %r', rule) url, parameters = parse_werkzeug_url(rule.rule) paths.setdefault(url, {}) if parameters: paths[url]['parameters'] = parameters for method in rule.methods: if method in ('HEAD', 'OPTIONS'): # XXX Do we want to process these? continue paths[url][method.lower()] = self._process_rule(rule) return paths
def deprecated(self, fn): """ Mark an operation as deprecated. This will be exposed through the OpenAPI operation object. Additionally a warning will be emitted when the API is used. This can be configured using the ``OPENAPI_WARN_DEPRECATED`` configuration option. This must be one of ``warn`` or ``log``. See :swagger:`operationDeprecated`. """ fn.deprecated = True @functools.wraps(fn) def call_deprecated(*args, **kwargs): method = self._config('warn_deprecated') log_args = request.method, request.url if method == 'warn': warnings.warn(_DEPRECATION_MESSAGE % log_args, DeprecationWarning) else: log.warning(_DEPRECATION_MESSAGE, *log_args) return fn(*args, **kwargs) return call_deprecated
def getCurrentSpotifyPlaylistList(): ret_list = [] access_token = getSpotifyToken() if access_token: sp = spy.Spotify(access_token) results = sp.current_user() uri = 'spotify:user:12120746446:playlist:6ZK4Tz0ZsZuJBYyDZqlbGt' username = uri.split(":")[2] playlist_id = uri.split(":")[4] results = sp.user_playlist(username, playlist_id) for i, t in enumerate(results['tracks']['items']): temp = {} temp['img_url'] = t['track']['album']['images'][2]['url'] temp['name'] = t['track']['name'] temp['artist'] = t['track']['artists'][0]['name'] temp['album'] = t['track']['album']['name'] duration = int(t['track']['duration_ms']) / (1000 * 60) temp['duration'] = "{:2.2f}".format(duration) ret_list.append(temp) return ret_list
def getCurrentYoutubePlaylistList(): #keep dicts of videos in ret_list ret_list = [] youtube = initYoutube() playlistitems_list_request = youtube.playlistItems().list( playlistId=constants.YOUTUBE_PLAYLIST_ID, part="snippet,contentDetails", maxResults=30 ) playlistitems_list_response = playlistitems_list_request.execute() #print(playlistitems_list_response) for playlist_item in playlistitems_list_response['items']: video_dict = {} #print(playlist_item) video_dict['name'] = playlist_item['snippet']['title'] video_dict['img_url'] = playlist_item['snippet']['thumbnails']['default']['url'] video_dict['artist'] = "--" video_dict['album'] = "--" video_dict['duration'] = "--:--" ret_list.append(video_dict) return ret_list