我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.Response()。
def get(self, oid): """Get an object. Returns an item from the DB with the request.data JSON object or all the items if oid == None :arg self: The class of the object to be retrieved :arg integer oid: the ID of the object in the DB :returns: The JSON item/s stored in the DB """ try: ensure_authorized_to('read', self.__class__) query = self._db_query(oid) json_response = self._create_json_response(query, oid) return Response(json_response, mimetype='application/json') except Exception as e: return error.format_exception( e, target=self.__class__.__name__.lower(), action='GET')
def export_to_csv(self, ids): qs = json.loads(self.model.objects(id__in=ids).to_json()) def generate(): yield ','.join(list(max(qs, key=lambda x: len(x)).keys())) + '\n' for item in qs: yield ','.join([str(i) for i in list(item.values())]) + '\n' return Response( generate(), mimetype="text/csv", headers={ "Content-Disposition": "attachment;filename=%s.csv" % self.model.__name__.lower() } )
def enqueue_mwoffliner(): token = UserJWT.from_request_header(request) # only admins can enqueue task if not token.is_admin: raise exception.NotEnoughPrivilege() def check_task(config: dict): # check config is a dict if not isinstance(config, dict): raise exception.InvalidRequest() # check config has mandatory data if config.get('mwUrl') is None or config.get('adminEmail') is None: raise exception.InvalidRequest() def enqueue_task(config: dict): task_name = 'mwoffliner' celery_task = celery.send_task(task_name, kwargs={ 'token': TaskJWT.new(task_name), 'config': config }) TasksCollection().insert_one({ '_id': celery_task.id, 'celery_task_name': task_name, 'status': 'PENDING', 'time_stamp': {'created': datetime.utcnow(), 'started': None, 'ended': None}, 'options': config, 'steps': [] }) task_configs = request.get_json() if not isinstance(task_configs, list): raise exception.InvalidRequest() for task_config in task_configs: check_task(task_config) for task_config in task_configs: enqueue_task(task_config) return Response(status=202)
def invalid_request(_): return Response(status=400)
def retrieveAlertsCyber(): """ Retrieve Alerts from ElasticSearch and return formatted XML with limited alert content """ # get result from cache getCacheResult = getCache(request.url, "url") if getCacheResult is not False: app.logger.debug('Returning /retrieveAlertsCyber from Cache for %s' % str(request.remote_addr)) return Response(getCacheResult) # query ES else: returnResult = formatAlertsXml(queryAlerts(app.config['MAXALERTS'], checkCommunityIndex(request))) setCache(request.url, returnResult, 1, "url") app.logger.debug('Returning /retrieveAlertsCyber from ES for %s' % str(request.remote_addr)) return Response(returnResult, mimetype='text/xml')
def querySingleIP(): """ Retrieve Attack data from index about a single IP """ # get result from cache getCacheResult = getCache(request.url, "url") if getCacheResult is not False: app.logger.debug('Returning /querySingleIP from Cache for %s' % str(request.remote_addr)) return Response(getCacheResult) # query ES else: returnResult = formatSingleIP(queryForSingleIP(app.config['MAXALERTS'], request.args.get('ip'), checkCommunityIndex(request))) setCache(request.url, returnResult, 60, "url") app.logger.debug('Returning /querySingleIP from ES for %s' % str(request.remote_addr)) return Response(returnResult, mimetype='text/xml') # Routes with both XML and JSON output
def retrieveAlertsCount(): """ Retrieve number of alerts in timeframe (GET-Parameter time as decimal or "day") """ # Retrieve Number of Alerts from ElasticSearch and return as xml / json if not request.args.get('time'): app.logger.error('No time GET-parameter supplied in retrieveAlertsCount. Must be decimal number (in minutes) or string "day"') return app.config['DEFAULTRESPONSE'] else: if request.args.get('out') and request.args.get('out') == 'json': # get result from cache getCacheResult = getCache(request.url, "url") if getCacheResult is not False: return jsonify(getCacheResult) else: returnResult = formatAlertsCount(queryAlertsCount(request.args.get('time'), checkCommunityIndex(request)), 'json') setCache(request.url, returnResult, 60, "url") return jsonify(returnResult) else: # get result from cache getCacheResult = getCache(request.url, "url") if getCacheResult is not False: return Response(getCacheResult, mimetype='text/xml') else: returnResult = formatAlertsCount(queryAlertsCount(request.args.get('time'), checkCommunityIndex(request)), 'xml') setCache(request.url, returnResult, 60, "url") return Response(returnResult, mimetype='text/xml')
def retrieveIPs(): """ Retrieve IPs from ElasticSearch and return formatted XML or JSON with IPs """ if request.args.get('out') and request.args.get('out') == 'json': getCacheResult = getCache(request.url, "url") if getCacheResult is not False: return jsonify(getCacheResult) else: returnResult = formatBadIP( queryBadIPs(app.config['BADIPTIMESPAN'], checkCommunityIndex(request)), 'json') setCache(request.url, returnResult, 60, "url") return jsonify(returnResult) else: getCacheResult = getCache(request.url, "url") if getCacheResult is not False: return Response(getCacheResult, mimetype='text/xml') else: returnResult = formatBadIP( queryBadIPs(app.config['BADIPTIMESPAN'], checkCommunityIndex(request)), 'xml') setCache(request.url, returnResult, 60, "url") return Response(returnResult, mimetype='text/xml') # Routes with JSON output
def getSimpleMessage(): return Response("POST is required for this action.", mimetype='text/html', status=500)
def authFailure(): """ Authentication failure --> 401 """ return Response("Authentication failed!", 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
def authed(func: Callable[[], str]) -> Callable[[], Union[Response, str]]: """ Given a function returns one that requires basic auth """ @wraps(func) def decorator(): auth = request.authorization if auth and validAuth(auth.username, auth.password): return func() return authFailure() return decorator
def cache(self, seconds=60): def inner_decorator(f): @wraps(f) def wrapper(*args, **kwds): resp = f(*args, **kwds) if not isinstance(resp, flask.Response): resp = flask.make_response(resp) resp.headers['Cache-Control'] = 'public, max-age={}'.format(seconds) resp.headers["Expires"] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(time.time() + seconds)) resp.add_etag() return resp return wrapper return inner_decorator
def require_api_token(self, f): @wraps(f) def wrapper(*args, **kwds): token = flask.request.headers.get('X-Status-API-Token', flask.request.headers.get('X-Live-Status-API-Token')) if token != self.settings['api_token']: resp = flask.make_response(flask.jsonify({}), 403) else: resp = f(*args, **kwds) if not isinstance(resp, flask.Response): resp = flask.make_response(resp) resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' resp.headers['Pragma'] = 'no-cache' resp.headers['Expires'] = '0' return resp return wrapper
def _chat(): data = request.args question = data.get('question') session = data.get('session') lang = data.get('lang', 'en') query = data.get('query', 'false') query = query.lower() == 'true' request_id = request.headers.get('X-Request-Id') marker = data.get('marker', 'default') response, ret = ask( question, lang, session, query, request_id=request_id, marker=marker) return Response(json_encode({'ret': ret, 'response': response}), mimetype="application/json")
def _batch_chat(): auth = request.form.get('Auth') if not auth or not check_auth(auth): return authenticate() questions = request.form.get('questions') questions = json.loads(questions) session = request.form.get('session') lang = request.form.get('lang', 'en') responses = [] for idx, question in questions: response, ret = ask(str(question), lang, session) responses.append((idx, response, ret)) return Response(json_encode({'ret': 0, 'response': responses}), mimetype="application/json")
def _said(): data = request.args session = data.get('session') message = data.get('message') ret, response = said(session, message) return Response(json_encode({'ret': ret, 'response': response}), mimetype="application/json")
def _rate(): data = request.args response = '' try: ret = rate_answer(data.get('session'), int( data.get('index')), data.get('rate')) except Exception as ex: response = ex.message return Response(json_encode({'ret': ret, 'response': response}), mimetype="application/json")
def _chatbots(): data = request.args lang = data.get('lang', None) session = data.get('session') characters = list_character(lang, session) return Response(json_encode({'ret': 0, 'response': characters}), mimetype="application/json")
def _start_session(): botname = request.args.get('botname') user = request.args.get('user') test = request.args.get('test', 'false') refresh = request.args.get('refresh', 'false') test = test.lower() == 'true' refresh = refresh.lower() == 'true' sid = session_manager.start_session( user=user, key=botname, test=test, refresh=refresh) sess = session_manager.get_session(sid) sess.sdata.botname = botname sess.sdata.user = user return Response(json_encode({'ret': 0, 'sid': str(sid)}), mimetype="application/json")
def _sessions(): sessions = session_manager.list_sessions() return Response(json_encode({'ret': 0, 'response': sessions}), mimetype="application/json")
def _set_weights(): data = request.args lang = data.get('lang', None) param = data.get('param') sid = data.get('session') ret, response = set_weights(param, lang, sid) if ret: sess = session_manager.get_session(sid) if sess and hasattr(sess.sdata, 'weights'): logger.info("Set weights {} successfully".format(sess.sdata.weights)) else: logger.info("Set weights failed.") return Response(json_encode({'ret': ret, 'response': response}), mimetype="application/json")
def _set_context(): data = request.args context_str = data.get('context') context = {} for tok in context_str.split(','): k, v = tok.split('=') context[k.strip()] = v.strip() sid = data.get('session') ret, response = set_context(context, sid) return Response(json_encode({'ret': ret, 'response': response}), mimetype="application/json")
def _remove_context(): data = request.args keys = data.get('keys') keys = keys.split(',') sid = data.get('session') ret, response = remove_context(keys, sid) return Response(json_encode({'ret': ret, 'response': response}), mimetype="application/json")
def _update_config(): data = request.args.to_dict() for k, v in data.iteritems(): if v.lower() == 'true': data[k]=True elif v.lower() == 'false': data[k]=False elif re.match(r'[0-9]+', v): data[k]=int(v) elif re.match(r'[0-9]+\.[0-9]+', v): data[k]=float(v) else: data[k]=str(v) ret, response = update_config(**data) return Response(json_encode({'ret': ret, 'response': response}), mimetype="application/json")
def _log(): def generate(): with open(LOG_CONFIG_FILE) as f: for row in f: yield row return Response(generate(), mimetype='text/plain')
def _reset_session(): data = request.args sid = data.get('session') if session_manager.has_session(sid): session_manager.reset_session(sid) ret, response = True, "Session reset" else: ret, response = False, "No such session" return Response(json_encode({ 'ret': ret, 'response': response }), mimetype="application/json")
def _dump_history(): try: dump_history() ret, response = True, "Success" except Exception: ret, response = False, "Failure" return Response(json_encode({ 'ret': ret, 'response': response }), mimetype="application/json")
def _stats(): try: data = request.args days = int(data.get('lookback', 7)) dump_history() response = history_stats(HISTORY_DIR, days) ret = True except Exception as ex: ret, response = False, {'err_msg': str(ex)} logger.error(ex) return Response(json_encode({'ret': ret, 'response': response}), mimetype="application/json")
def authenticate(): return Response(json_encode({'ret': 401, 'response': {'text': 'Could not verify your access'}}), mimetype="application/json")
def add_virtual_tn(): """ The VirtualTN resource endpoint for adding VirtualTN's from the pool. """ body = request.json try: value = str(body['value']) assert len(value) <= 18 except (AssertionError, KeyError): raise InvalidAPIUsage( "Required argument: 'value' (str, length <= 18)", payload={'reason': 'invalidAPIUsage'}) virtual_tn = VirtualTN(value) try: db_session.add(virtual_tn) db_session.commit() except IntegrityError: db_session.rollback() msg = ("Did not add virtual TN {} to the pool " "-- already exists").format(value) log.info({"message": msg}) raise InvalidAPIUsage( "Virtual TN already exists", payload={'reason': 'duplicate virtual TN'}) return Response( json.dumps( {"message": "Successfully added TN to pool", "value": value}), content_type="application/json")
def list_virtual_tns(): """ The VirtualTN resource endpoint for listing VirtualTN's from the pool. """ virtual_tns = VirtualTN.query.all() res = [{'value': tn.value, 'session_id': tn.session_id} for tn in virtual_tns] available = len([tn.value for tn in virtual_tns if tn.session_id is None]) return Response( json.dumps({"virtual_tns": res, "pool_size": len(res), "available": available, "in_use": len(res) - available}), content_type="application/json")
def list_proxy_sessions(): """ The ProxySession resource endpoint for listing ProxySessions from the pool. """ sessions = ProxySession.query.all() res = [{ 'id': s.id, 'date_created': s.date_created.strftime('%Y-%m-%d %H:%M:%S'), 'virtual_tn': s.virtual_TN, 'participant_a': s.participant_a, 'participant_b': s.participant_b, 'expiry_date': s.expiry_date.strftime('%Y-%m-%d %H:%M:%S') if s.expiry_date else None} for s in sessions] return Response( json.dumps({"total_sessions": len(res), "sessions": res}), content_type="application/json")
def delete_session(): """ The ProxySession resource endpoint for removing a ProxySession to the pool. """ body = request.json try: session_id = str(body['session_id']) except (KeyError): raise InvalidAPIUsage( "Required argument: 'session_id' (str)", payload={'reason': 'invalidAPIUsage'}) try: session = ProxySession.query.filter_by(id=session_id).one() except NoResultFound: msg = ("ProxySession {} could not be deleted because" " it does not exist".format(session_id)) log.info({"message": msg, "status": "failed"}) raise InvalidAPIUsage( msg, status_code=404, payload={'reason': 'ProxySession not found'}) participant_a, participant_b, virtual_tn = ProxySession.terminate( session_id) recipients = [participant_a, participant_b] send_message( recipients, virtual_tn.value, SESSION_END_MSG, session_id, is_system_msg=True) msg = "Ended session {} and released {} back to pool".format( session_id, virtual_tn.value) log.info({"message": msg, "status": "succeeded"}) return Response( json.dumps({"message": "Successfully ended the session.", "status": "succeeded", "session_id": session_id}), content_type="application/json")
def getSwaggerJson(): json_file = open(os.path.join( "./", "messages.swagger.json"), "r") json = json_file.read() json_file.close() resp = Response(response=json, status=200, mimetype="application/json") return resp
def corpora_list(): page = 1 query = None category = None doc = None if 'page' in request.args: page = request.args['page'] if 'query' in request.args: query = request.args['query'] if 'category' in request.args: category = request.args['category'] if 'doc' in request.args: doc = unquote(request.args['doc']) return Response(json.dumps(get_sentences(page=page, query=query, category=category, document=doc)), mimetype='application/json')
def corpora_docs_list(): return Response(json.dumps(get_doc_list()), mimetype='application/json')
def docs_list(): page = 1 doc_type = None state = None if 'page' in request.args: page = request.args['page'] if 'doc_type' in request.args: doc_type = request.args['doc_type'] if 'state' in request.args: state = request.args['state'] states = [state] if state == ST_PENDING: states.append(ST_PROCESSING) if state == ST_PROCESSED: states.append(ST_ERROR) return Response(json.dumps(get_queue_list(page=page, doc_type=doc_type, states=states), default=datetime_handler), mimetype='application/json')
def geocoding_list(): activity_id = None queue_id = None if 'activity_id' in request.args: activity_id = request.args['activity_id'] if 'queue_id' in request.args: queue_id = request.args['queue_id'] return Response(json.dumps(get_geocoding_list(activity_id=activity_id, queue_id=queue_id), default=datetime_handler), mimetype='application/json')
def activity_list(): document_id = None if 'document_id' in request.args: document_id = request.args['document_id'] return Response(json.dumps(get_activity_list(document_id=document_id), default=datetime_handler), mimetype='application/json')
def haproxy_config(): return flask.Response(Haproxy().config, mimetype='application/txt')
def process_demoroom_members(): # Verify that the request is propery authorized #authz = valid_request_check(request) #if not authz[0]: # return authz[1] status = 200 if request.method == "POST": data = request.form try: sys.stderr.write("Adding %s to demo room.\n" % (data["email"])) reply=send_welcome_message(data["email"]) status = 201 resp = Response(reply, content_type='application/json', status=status) except KeyError: error = {"Error":"API Expects dictionary object with single element and key of 'email'"} status = 400 resp = Response(json.dumps(error), content_type='application/json', status=status) # demo_room_members = get_membership() # resp = Response( # json.dumps(demo_room_members, sort_keys=True, indent = 4, separators = (',', ': ')), # content_type='application/json', # status=status) else: resp = Response("OK", status=status) return resp
def remove_authorised_client(ip_addr_str=None): """Forgets that a client has been seen recently to allow running tests""" source_ip = request.headers["X-Forwarded-For"] if source_ip in _client_map: del _client_map[source_ip] return Response(status=204)
def custom_css(): return Response(get_config("css"), mimetype='text/css') # Static HTML files
def post_json(self, endpoint: str, data: JsonDict) -> Response: return self.client.post(endpoint, content_type="application/json", data=json.dumps(data))
def test_permalinks_work(self): db = InMemoryDemoDatabase() app = make_app(build_dir=self.TEST_DIR, demo_db=db) predictor = CountingPredictor() app.predictors = {"counting": predictor} app.testing = True client = app.test_client() def post(endpoint: str, data: JsonDict) -> Response: return client.post(endpoint, content_type="application/json", data=json.dumps(data)) data = {"some": "input"} response = post("/predict/counting", data=data) assert response.status_code == 200 result = json.loads(response.get_data()) slug = result.get("slug") assert slug is not None response = post("/permadata", data={"slug": "not the right slug"}) assert response.status_code == 400 response = post("/permadata", data={"slug": slug}) assert response.status_code == 200 result2 = json.loads(response.get_data()) assert set(result2.keys()) == {"modelName", "requestData", "responseData"} assert result2["modelName"] == "counting" assert result2["requestData"] == data assert result2["responseData"] == result
def show_page(page): try: valid_length = len(page) >= MIN_PAGE_NAME_LENGTH valid_name = re.match('^[a-z]+$', page.lower()) is not None if valid_length and valid_name: return render_template("{}.html".format(page)) else: msg = "Sorry, couldn't find page with name {}".format(page) raise NotFound(msg) except: rollbar.report_exc_info() return Response("404 Not Found")
def make_response(self, response_data, headers): # TODO: pass in optional filename filename = "response.csv" headers["Content-Disposition"] = "attachment; filename=\"{}\"".format(filename) headers["Content-Type"] = "{}; charset=utf-8".format(CSV_CONTENT_TYPE) response = Response(self.csvify(response_data), mimetype=CSV_CONTENT_TYPE) return response, headers
def csvify(self, response_data): """ Make Flask `Response` object, with data returned as a generator for the CSV content The CSV is built from JSON-like object (Python `dict` or list of `dicts`) """ if "items" in response_data: list_response_data = response_data["items"] else: list_response_data = [response_data] response_fields = list(list_response_data[0].keys()) column_order = getattr(self.response_schema, "csv_column_order", None) if column_order is None: # We should still be able to return a CSV even if no column order has been specified column_names = response_fields else: column_names = self.response_schema.csv_column_order # The column order be only partially specified column_names.extend([field_name for field_name in response_fields if field_name not in column_names]) output = StringIO() csv_writer = writer(output, quoting=QUOTE_MINIMAL) csv_writer.writerow(column_names) for item in list_response_data: csv_writer.writerow([item[column] for column in column_names]) # Ideally we'd want to `yield` each line to stream the content # But something downstream seems to break streaming yield output.getvalue()
def setup(self): @self.app.route('/anomaly', methods = ['POST']) def api_anomaly(): data = request.json if request.headers['Content-Type'] == 'application/json': success = self.app.monitor.process_anomaly_data(data) return self.handle_response(success, data) else: return Response("Unsupported media type\n" + data, status=415) @self.app.route('/monitor', methods = ['POST']) def api_monitor(): data = request.json if request.headers['Content-Type'] == 'application/json': success = self.app.monitor.process_monitor_flows(data) return self.handle_response(success, data) else: return Response("Unsupported media type\n" + data, status=415)
def handle_response(self, success, data): json_data = json.dumps(data) if success: return Response("OK\n" + json_data, status=200) else: return Response("BAD REQUEST\n" + json_data, status=400)