我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.make_response()。
def bookmarklet_js(): base_url = request.url.replace( "browser-tools/bookmarklet.js", "static/browser-tools/" ) if "localhost:" not in base_url: # seems like this shouldn't be necessary. but i think # flask's request.url is coming in with http even when # we asked for https on the server. weird. base_url = base_url.replace("http://", "https://") rendered = render_template( "browser-tools/bookmarklet.js", base_url=base_url ) resp = make_response(rendered, 200) resp.mimetype = "application/javascript" return resp
def session_chain_add(session_name, table_name, chain_name): try: req = request.get_json() if req is None: req = {} fw.sessions[session_name].chains.add( name=chain_name, table=table_name, **req) return make_response(jsonify(message='Chain created'), status.HTTP_201_CREATED) except ChainExists as e: http_code = status.HTTP_409_CONFLICT except ChainNotValid as e: http_code = status.HTTP_400_BAD_REQUEST except Exception as e: http_code = status.HTTP_500_INTERNAL_SERVER_ERROR return make_response(jsonify(message='Failed to create chain', error=e.message), http_code)
def get(self): if request.cookies.get('save_id'): resp = make_response(redirect(url_for('.exit'))) resp.set_cookie('user_name', expires=0) resp.set_cookie('login_time', expires=0) resp.set_cookie('save_id', expires=0) return resp if session.get('name'): session.pop('name') if session.get('show_name'): session.pop('show_name') if session.get('user_id'): session.pop('user_id') return redirect(url_for('.login')) # ?config.json ???? is_register ?false??????? ??????????????
def post_file(): if "var.json" in request.files.keys(): usrvar = json.load(request.files["var.json"]) else: return make_response("no var.json", 200) if "data.csv" in request.files.keys(): csvfile = request.files["data.csv"] resp = { "server-connected": False, "data-posted": False, "err-occur": False } rpmng = ReportManager(usrvar) try: if rpmng.connect_server(): resp["server-connected"] = True if rpmng.submit_progress(csvfile=csvfile): resp["data-posted"] = True except: resp["err-occur"] = True rpmng.finalize() return make_response(jsonify(resp), 200) else: return make_response("no data.csv", 200)
def session_interface_add(session_name, interface_name): try: req = request.get_json() if req is None: req = {} fw.sessions[session_name].interfaces.add( name=interface_name, **req) return make_response(jsonify(message='Interface created'), status.HTTP_201_CREATED) except InterfaceExists as e: http_code = status.HTTP_409_CONFLICT except InterfaceNotValid as e: http_code = status.HTTP_400_BAD_REQUEST except Exception as e: http_code = status.HTTP_500_INTERNAL_SERVER_ERROR return make_response(jsonify(message='Failed to create interface', error=e.message), http_code)
def post(self): if (request.form['username']): data = {"user": request.form['username'], "key": request.form['password']} result = dockletRequest.unauthorizedpost('/login/', data) ok = result and result.get('success', None) if (ok and (ok == "true")): # set cookie:docklet-jupyter-cookie for jupyter notebook resp = make_response(redirect(request.args.get('next',None) or '/dashboard/')) app_key = os.environ['APP_KEY'] resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(request.form['username'], app_key)) # set session for docklet session['username'] = request.form['username'] session['nickname'] = result['data']['nickname'] session['description'] = result['data']['description'] session['avatar'] = '/static/avatar/'+ result['data']['avatar'] session['usergroup'] = result['data']['group'] session['status'] = result['data']['status'] session['token'] = result['data']['token'] return resp else: return redirect('/login/') else: return redirect('/login/')
def get(self): form = external_generate.external_auth_generate_request() result = dockletRequest.unauthorizedpost('/external_login/', form) ok = result and result.get('success', None) if (ok and (ok == "true")): # set cookie:docklet-jupyter-cookie for jupyter notebook resp = make_response(redirect(request.args.get('next',None) or '/dashboard/')) app_key = os.environ['APP_KEY'] resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key)) # set session for docklet session['username'] = result['data']['username'] session['nickname'] = result['data']['nickname'] session['description'] = result['data']['description'] session['avatar'] = '/static/avatar/'+ result['data']['avatar'] session['usergroup'] = result['data']['group'] session['status'] = result['data']['status'] session['token'] = result['data']['token'] return resp else: return redirect('/login/')
def post(self): form = external_generate.external_auth_generate_request() result = dockletRequest.unauthorizedpost('/external_login/', form) ok = result and result.get('success', None) if (ok and (ok == "true")): # set cookie:docklet-jupyter-cookie for jupyter notebook resp = make_response(redirect(request.args.get('next',None) or '/dashboard/')) app_key = os.environ['APP_KEY'] resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key)) # set session for docklet session['username'] = result['data']['username'] session['nickname'] = result['data']['nickname'] session['description'] = result['data']['description'] session['avatar'] = '/static/avatar/'+ result['data']['avatar'] session['usergroup'] = result['data']['group'] session['status'] = result['data']['status'] session['token'] = result['data']['token'] return resp else: return redirect('/login/')
def verify_request_signature(func): @wraps(func) def decorated(*args, **kwargs): signature = request.headers.get('x-hub-signature', None) if signature: elements = signature.split('=') method = elements[0] signature_hash = elements[1] expected_hash = hmac.new(APP_SECRET, msg=request.get_data(), digestmod=method).hexdigest() if signature_hash != expected_hash: LOGGER.error('Signature was invalid') return make_response('', 403) else: LOGGER.error('Could not validate the signature') return func(*args, **kwargs) return decorated
def webhook_callback(): data = request.json if data['object'] == 'page': for page_entry in data['entry']: page_id = page_entry['id'] time_of_event = page_entry['time'] for message_event in page_entry['messaging']: if 'optin' in message_event: LOGGER.info('Webhook received message event: option from page %s at %d', page_id, time_of_event) elif 'message' in message_event: received_message(message_event) elif 'delivery' in message_event: LOGGER.info('Webhook received message event: delivery from page %s at %d', page_id, time_of_event) elif 'postback' in message_event: received_postback(message_event) elif 'read' in message_event: LOGGER.info('Webhook received message event: read from page %s at %d', page_id, time_of_event) elif 'account_linking' in message_event: LOGGER.info('Webhook received message event: account linking from page %s at %d', page_id, time_of_event) else: LOGGER.info('Webhook received unknown message event: %s from page %s at %d', message_event, page_id, time_of_event) return make_response('', 200)
def test_view_decorators(self): app = flask.Flask(__name__) def add_x_parachute(f): def new_function(*args, **kwargs): resp = flask.make_response(f(*args, **kwargs)) resp.headers['X-Parachute'] = 'awesome' return resp return new_function class Index(flask.views.View): decorators = [add_x_parachute] def dispatch_request(self): return 'Awesome' app.add_url_rule('/', view_func=Index.as_view('index')) c = app.test_client() rv = c.get('/') self.assert_equal(rv.headers['X-Parachute'], 'awesome') self.assert_equal(rv.data, b'Awesome')
def test_make_response(self): app = flask.Flask(__name__) with app.test_request_context(): rv = flask.make_response() self.assert_equal(rv.status_code, 200) self.assert_equal(rv.data, b'') self.assert_equal(rv.mimetype, 'text/html') rv = flask.make_response('Awesome') self.assert_equal(rv.status_code, 200) self.assert_equal(rv.data, b'Awesome') self.assert_equal(rv.mimetype, 'text/html') rv = flask.make_response('W00t', 404) self.assert_equal(rv.status_code, 404) self.assert_equal(rv.data, b'W00t') self.assert_equal(rv.mimetype, 'text/html')
def __call__(self, handler=None, error=None, request_type=None): """ :type error: ?? :param request_type: ???? :return: """ result = super(HTMLResponse, self).__call__(handler, error, request_type) if self.request_type == RequestTypeEnum.Flask: from flask import make_response html = '<p>code:%s message:%s</p>' % (result["code"], result["message"]) response = make_response(html) response.headers["Content-Type"] = "text/html; charset=utf-8" return response else: html = '<p>code:%s message:%s</p>' % (result["code"], result["message"]) self.handler.set_header("Content-Type", "text/html; charset=utf-8") return self.handler.write(html)
def password_required(short_name): (project, owner, n_tasks, n_task_runs, overall_progress, last_activity, n_results) = project_by_shortname(short_name) form = PasswordForm(request.form) if request.method == 'POST' and form.validate(): password = request.form.get('password') cookie_exp = current_app.config.get('PASSWD_COOKIE_TIMEOUT') passwd_mngr = ProjectPasswdManager(CookieHandler(request, signer, cookie_exp)) if passwd_mngr.validates(password, project): response = make_response(redirect(request.args.get('next'))) return passwd_mngr.update_response(response, project, get_user_id_or_ip()) flash(gettext('Sorry, incorrect password')) return render_template('projects/password.html', project=project, form=form, short_name=short_name, next=request.args.get('next'))
def run_cmd(): """Execute general (non-host specific) command.""" #Read 'command' argument from query string. comm = request.args.get('command') if not comm or comm in EXCLUDED_FUNCTIONS: abort(400) try: #Read 'args' argument from query string. args = request.args.getlist('args') #Cast arguments to string. for i, val in enumerate(args): args[i] = str(val) except KeyError: args = [] #Execute command. ret = RSPET_API.call_plugin(comm, args) if ret['code'] == rspet_server.ReturnCodes.OK: http_code = 200 else: http_code = 404 return make_response(jsonify(ret), http_code)
def ThreeDTilesRead(table, column, bounds, lod): session = Session(table, column) # offsets = [round(off, 2) for off in list_from_str(offsets)] box = list_from_str(bounds) # requested = [scales, offsets] stored_patches = session.lopocstable.filter_stored_output() schema = stored_patches['point_schema'] pcid = stored_patches['pcid'] # scales = [scale] * 3 scales = stored_patches['scales'] offsets = stored_patches['offsets'] [tile, npoints] = get_points(session, box, lod, offsets, pcid, scales, schema) if Config.DEBUG: tile.sync() print("NPOINTS: ", npoints) # build flask response response = make_response(tile.to_array().tostring()) response.headers['content-type'] = 'application/octet-stream' return response
def api_send_loves(): sender = request.form.get('sender') recipients = sanitize_recipients(request.form.get('recipient')) message = request.form.get('message') try: recipients = send_loves(recipients, message, sender_username=sender) recipients_display_str = ', '.join(recipients) link_url = create_love_link(recipients_display_str, message).url return make_response( u'Love sent to {}! Share: {}'.format(recipients_display_str, link_url), LOVE_CREATED_STATUS_CODE, {} ) except TaintedLove as exc: return make_response( exc.user_message, LOVE_FAILED_STATUS_CODE if exc.is_error else LOVE_CREATED_STATUS_CODE, {} )
def plot_correlation(): """ Generate the Correlation heat map as a PNG Parameters ---------- job_id: str Returns ------- image/png """ job_id = request.args.get('job_id') if job_id is None: return None job = mongo_get_job(job_id) s3_file_key = job['s3_file_key'] data = s3_to_df(s3_file_key) fig = plot_correlation_fig(data) correlation_plot = fig_to_png(fig) response = make_response(correlation_plot.getvalue()) response.mimetype = 'image/png' return response
def convert(content_type, content): cachekey = request.path cachevalue = cache.get(cachekey) if cachevalue is None: content = decode(content) content_type = decode(content_type).decode() if content_type.startswith('image/svg+xml'): content = __svg2png(content) content_type = 'image/png' cache.set(cachekey, (content, content_type), timeout=CACHE_RETENTION) else: content = cachevalue[0] content_type = cachevalue[1] response = make_response(content) response.content_type = content_type response.cache_control.max_age = CACHE_RETENTION return response
def get_inbox(): pyldnlog.debug("Requested inbox data of {} in {}".format(request.url, request.headers['Accept'])) if not request.headers['Accept'] or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']: resp = make_response(inbox_graph.serialize(format='application/ld+json')) resp.headers['Content-Type'] = 'application/ld+json' elif request.headers['Accept'] in ACCEPTED_TYPES: resp = make_response(inbox_graph.serialize(format=request.headers['Accept'])) resp.headers['Content-Type'] = request.headers['Accept'] else: return 'Requested format unavailable', 415 resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn' resp.headers['Allow'] = "GET, HEAD, OPTIONS, POST" resp.headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type", <http://www.w3.org/ns/ldp#RDFSource>; rel="type", <http://www.w3.org/ns/ldp#Container>; rel="type", <http://www.w3.org/ns/ldp#BasicContainer>; rel="type"' resp.headers['Accept-Post'] = 'application/ld+json, text/turtle' return resp
def get_notification(id): pyldnlog.debug("Requested notification data of {}".format(request.url)) pyldnlog.debug("Headers: {}".format(request.headers)) # Check if the named graph exists pyldnlog.debug("Dict key is {}".format(pyldnconf._inbox_url + id)) if pyldnconf._inbox_url + id not in graphs: return 'Requested notification does not exist', 404 if 'Accept' not in request.headers or request.headers['Accept'] == '*/*' or 'text/html' in request.headers['Accept']: resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format='application/ld+json')) resp.headers['Content-Type'] = 'application/ld+json' elif request.headers['Accept'] in ACCEPTED_TYPES: resp = make_response(graphs[pyldnconf._inbox_url + id].serialize(format=request.headers['Accept'])) resp.headers['Content-Type'] = request.headers['Accept'] else: return 'Requested format unavailable', 415 resp.headers['X-Powered-By'] = 'https://github.com/albertmeronyo/pyldn' resp.headers['Allow'] = "GET" return resp
def post(self, meta_type): try: if meta_type == 'item': service = self.item_meta_service new_metadata = ItemMetadata(request.get_json(), version=1, active=True) elif meta_type == 'user': service = self.user_meta_service new_metadata = UserMetadata(request.get_json(), version=1, active=True) else: raise StatusCodeException('Invalid type', 400) if not service.get_active(): service.insert(new_metadata.to_database()) return make_response(new_metadata.to_json()) else: raise StatusCodeException('Conflict', 409) except StatusCodeException as ex: return ex.to_response() except Exception as ex: return StatusCodeException(ex.message, 500).to_response()
def get(self, meta_type): try: if meta_type == 'item': json_metadata = [ItemMetadata(meta).to_json() for meta in self.item_meta_service.get_all()] elif meta_type == 'user': json_metadata = [UserMetadata(meta).to_json() for meta in self.user_meta_service.get_all()] else: raise StatusCodeException('Invalid type', 400) return make_response(json_metadata) except StatusCodeException as ex: return ex.to_response() except Exception as ex: return StatusCodeException(ex.message, 500).to_response()
def validate_json(required): def decorator(f): @wraps(f) def wrapper(*args, **kw): try: request.json except Exception: return json_response( "This endpoint requires a json request body", 400 ) for r in required.split(","): if r not in (request.json or {}): log.warning( "Required field not specified: %s, json is %s", r, request.json ) return make_response(jsonify( { "message": "Required field not specified: %s" % r, "status": 500 }), 500) return f(*args, **kw) return wrapper return decorator
def post_recipe(): payload = request.get_json() topology = payload.get("topology") scenarios = payload.get("scenarios") headers = payload.get("headers") #pattern = payload.get("header_pattern") if not topology: abort(400, "Topology required") if not scenarios: abort(400, "Failure scenarios required") if headers and type(headers)!=dict: abort(400, "Headers must be a dictionary") # if not pattern: # abort(400, "Header_pattern required") appgraph = ApplicationGraph(topology) fg = A8FailureGenerator(appgraph, a8_controller_url='{0}/v1/rules'.format(a8_controller_url), a8_controller_token=a8_controller_token, headers=headers, debug=debug) fg.setup_failures(scenarios) return make_response(jsonify(recipe_id=fg.get_id()), 201, {'location': url_for('get_recipe_results', recipe_id=fg.get_id())})
def test_make_response_with_response_instance(self): app = flask.Flask(__name__) with app.test_request_context(): rv = flask.make_response( flask.jsonify({'msg': 'W00t'}), 400) self.assertEqual(rv.status_code, 400) self.assertEqual(rv.data, b'{\n "msg": "W00t"\n}') self.assertEqual(rv.mimetype, 'application/json') rv = flask.make_response( flask.Response(''), 400) self.assertEqual(rv.status_code, 400) self.assertEqual(rv.data, b'') self.assertEqual(rv.mimetype, 'text/html') rv = flask.make_response( flask.Response('', headers={'Content-Type': 'text/html'}), 400, [('X-Foo', 'bar')]) self.assertEqual(rv.status_code, 400) self.assertEqual(rv.headers['Content-Type'], 'text/html') self.assertEqual(rv.headers['X-Foo'], 'bar')
def session_addressbook_update(session_name, address_name): try: req = request.get_json() if req is None: req = {} fw.sessions[session_name].addressbook.update( name=address_name, **req) return make_response(jsonify(message='Address updated'), status.HTTP_200_OK) except AddressNotFound as e: http_code = status.HTTP_404_NOT_FOUND except AddressNotValid as e: http_code = status.HTTP_400_BAD_REQUEST except AddressNotUpdated as e: return make_response(jsonify(message='Address not updated'), status.HTTP_204_NO_CONTENT) except Exception as e: http_code = status.HTTP_500_INTERNAL_SERVER_ERROR return make_response(jsonify(message='Failed to update address', error=e.message), http_code)
def session_service_delete(session_name, service_name): data = {} try: fw.sessions[session_name].services.delete(name=service_name) return make_response(jsonify(message='Service deleted'), status.HTTP_200_OK) except ServiceNotFound as e: http_code = status.HTTP_404_NOT_FOUND except ServiceInUse as e: http_code = status.HTTP_406_NOT_ACCEPTABLE data['deps'] = e.deps except ServiceNotValid as e: http_code = status.HTTP_400_BAD_REQUEST except Exception as e: http_code = status.HTTP_500_INTERNAL_SERVER_ERROR return make_response(jsonify(message='Failed to delete service', error=e.message, **data), http_code)
def session_service_update(session_name, service_name): try: req = request.get_json() if req is None: req = {} fw.sessions[session_name].services.update( name=service_name, **req) return make_response(jsonify(message='Service updated'), status.HTTP_200_OK) except ServiceNotFound as e: http_code = status.HTTP_404_NOT_FOUND except ServiceNotValid as e: http_code = status.HTTP_400_BAD_REQUEST except ServiceNotUpdated as e: return make_response(jsonify(message='Service not updated'), status.HTTP_204_NO_CONTENT) except Exception as e: http_code = status.HTTP_500_INTERNAL_SERVER_ERROR return make_response(jsonify(message='Failed to update service', error=e.message), http_code)
def session_chain_delete(session_name, table_name, chain_name): data = {} try: fw.sessions[session_name].chains.delete( name=chain_name, table=table_name) return make_response(jsonify(message='Chain deleted'), status.HTTP_200_OK) except ChainNotFound as e: http_code = status.HTTP_404_NOT_FOUND except ChainInUse as e: http_code = status.HTTP_406_NOT_ACCEPTABLE data['deps'] = e.deps except ChainNotValid as e: http_code = status.HTTP_400_BAD_REQUEST except Exception as e: http_code = status.HTTP_500_INTERNAL_SERVER_ERROR return make_response(jsonify(message='Failed to delete chain', error=e.message, **data), http_code)
def session_chain_update(session_name, table_name, chain_name): try: req = request.get_json() if req is None: req = {} fw.sessions[session_name].chains.update( name=chain_name, table=table_name, **req) return make_response(jsonify(message='Chain updated'), status.HTTP_200_OK) except ChainNotFound as e: http_code = status.HTTP_404_NOT_FOUND except ChainNotValid as e: http_code = status.HTTP_400_BAD_REQUEST except ChainNotUpdated as e: return make_response(jsonify(message='Chain not updated'), status.HTTP_204_NO_CONTENT) except Exception as e: http_code = status.HTTP_500_INTERNAL_SERVER_ERROR return make_response(jsonify(message='Failed to update chain', error=e.message), http_code)
def session_check_update(session_name, check_name): try: req = request.get_json() if req is None: req = {} fw.sessions[session_name].checks.update(name=check_name, **req) return make_response(jsonify(message='Rollback check updated'), status.HTTP_200_OK) except CheckNotFound as e: http_code = status.HTTP_404_NOT_FOUND except CheckNotValid as e: http_code = status.HTTP_400_BAD_REQUEST except CheckNotUpdated as e: return make_response(jsonify(message='Rollback check not updated'), status.HTTP_204_NO_CONTENT) except Exception as e: http_code = status.HTTP_500_INTERNAL_SERVER_ERROR return make_response(jsonify(message='Failed to update rollback check', error=e.message), http_code)
def update_payments(id): try: if not request.is_json: raise DataValidationError('Invalid payment: Content Type is not json') data = request.get_json(silent=True) message = payment_service.update_payment(id,payment_replacement=data) rc = status.HTTP_200_OK except PaymentNotFoundError as e: message = e.message rc = status.HTTP_404_NOT_FOUND except DataValidationError as e: message = e.message rc = status.HTTP_400_BAD_REQUEST return make_response(jsonify(message), rc) ###################################################################### # UPDATE AN EXISTING PAYMENT PARTIALLY ######################################################################
def charge_payment(user_id): try: if not request.data: raise DataValidationError('Invalid request: body of request contained no data') if not request.is_json: raise DataValidationError('Invalid request: request not json') data = request.get_json() if data['amount']: if(data['amount'] < 0): raise DataValidationError('Invalid request: Order amount is negative.') else: resp = payment_service.perform_payment_action(user_id,payment_attributes=data) if resp == True: message = {'success' : 'Default payment method for user_id: %s has been charged $%.2f' % (str(user_id), data['amount'])} rc = status.HTTP_200_OK except DataValidationError as e: message = {'error' : e.message} rc = status.HTTP_400_BAD_REQUEST except KeyError as e: message = {'error' : 'Invalid request: body of request does not have the amount to be charged'} rc = status.HTTP_400_BAD_REQUEST return make_response(jsonify(message), rc)
def home(): '''Home page of the service.''' return flask.make_response( 'Hello World, I\'m FoodAId.', 200)
def kill(): '''Stops the bot thread.''' BOT.is_running = False return flask.make_response( '@FoodAId:\n' + 'I\'m sad.\n' + 'Why do I have to die?\n' + 'Why does anyone have to die?', 200)
def get(self): resp = make_response(redirect('/login/')) session.pop('username', None) session.pop('nickname', None) session.pop('description', None) session.pop('avatar', None) session.pop('status', None) session.pop('usergroup', None) session.pop('token', None) resp.set_cookie('docklet-jupyter-cookie', '', expires=0) return resp
def logs_get(filename): data = { "filename": filename } result = dockletRequest.post('/logs/get/', data).get('result', '') response = make_response(result) response.headers["content-type"] = "text/plain" return response
def bytes(n): global max_size n = min(n, max_size) response = make_response() response.data = bytearray(randint(0, 255) for i in range(n)) response.content_type = 'application/octet-stream' response.status_code = 200 return response
def status_code(code): r = make_response() r.status_code = int(code) return r
def post(self): # get the post data post_data = request.get_json() # check if user already exists user = User.query.filter_by(email=post_data.get('email')).first() if not user: try: user = User( email=post_data.get('email'), password=post_data.get('password') ) # insert the user db.session.add(user) db.session.commit() # generate the auth token auth_token = user.encode_auth_token(user.id) responseObject = { 'status': 'success', 'message': 'Successfully registered.', 'auth_token': auth_token.decode() } return make_response(jsonify(responseObject)), 201 except Exception as e: responseObject = { 'status': 'fail', 'message': 'Some error occurred. Please try again.' } return make_response(jsonify(responseObject)), 401 else: responseObject = { 'status': 'fail', 'message': 'User already exists. Please Log in.', } return make_response(jsonify(responseObject)), 202
def post(self): # get the post data post_data = request.get_json() try: # fetch the user data user = User.query.filter_by( email=post_data.get('email') ).first() if user and bcrypt.check_password_hash( user.password, post_data.get('password') ): auth_token = user.encode_auth_token(user.id) if auth_token: responseObject = { 'status': 'success', 'message': 'Successfully logged in.', 'auth_token': auth_token.decode() } return make_response(jsonify(responseObject)), 200 else: responseObject = { 'status': 'fail', 'message': 'User does not exist.' } return make_response(jsonify(responseObject)), 404 except Exception as e: print(e) responseObject = { 'status': 'fail', 'message': 'Try again' } return make_response(jsonify(responseObject)), 500
def get(self): # get the auth token auth_header = request.headers.get('Authorization') if auth_header: try: auth_token = auth_header.split(" ")[1] except IndexError: responseObject = { 'status': 'fail', 'message': 'Bearer token malformed.' } return make_response(jsonify(responseObject)), 401 else: auth_token = '' if auth_token: resp = User.decode_auth_token(auth_token) if not isinstance(resp, str): user = User.query.filter_by(id=resp).first() responseObject = { 'status': 'success', 'data': { 'user_id': user.id, 'email': user.email, 'admin': user.admin, 'registered_on': user.registered_on } } return make_response(jsonify(responseObject)), 200 responseObject = { 'status': 'fail', 'message': resp } return make_response(jsonify(responseObject)), 401 else: responseObject = { 'status': 'fail', 'message': 'Provide a valid auth token.' } return make_response(jsonify(responseObject)), 401
def post(self): # get auth token auth_header = request.headers.get('Authorization') if auth_header: auth_token = auth_header.split(" ")[1] else: auth_token = '' if auth_token: resp = User.decode_auth_token(auth_token) if not isinstance(resp, str): # mark the token as blacklisted blacklist_token = BlacklistToken(token=auth_token) try: # insert the token db.session.add(blacklist_token) db.session.commit() responseObject = { 'status': 'success', 'message': 'Successfully logged out.' } return make_response(jsonify(responseObject)), 200 except Exception as e: responseObject = { 'status': 'fail', 'message': e } return make_response(jsonify(responseObject)), 200 else: responseObject = { 'status': 'fail', 'message': resp } return make_response(jsonify(responseObject)), 401 else: responseObject = { 'status': 'fail', 'message': 'Provide a valid auth token.' } return make_response(jsonify(responseObject)), 403 # define the API resources
def login(self): if self.is_authenticated(): return redirect(url_for('get_config_site')) if request.method == "GET": return render_template('login.html') if request.form.get('password', None) == config.get('CONFIG_PASSWORD', None): resp = make_response(redirect(url_for('get_config_site'))) resp.set_cookie('auth', config['AUTH_KEY']) return resp
def api(file): if file == 'all': return all elif file == 'now': return now else: return make_response('404 Not Found', 404)