我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用flask.request.json()。
def post(self): """Login the user""" username = request.json['username'] password = request.json['password'] us = User.query\ .filter(User.disabled is False)\ .filter(User.sigaa_user_name == username)\ .first() abort_if_none(us, 403, 'Username or password incorrect') if not check_password_hash(us.password, password): return msg('Username or password incorrect'), 403 token = jwt.encode( {'id_user': us.id_user, 'tid': random.random()}, config.SECRET_KEY, algorithm='HS256' ).decode('utf-8') return msg(token, 'token')
def add_customer(): customers = mongo.db.customers first_name = request.json['first_name'] last_name = request.json['last_name'] date_of_birth = request.json['date_of_birth'] is_new = request.json['is_new'] country = request.json['country'] city = request.json['city'] is_online = request.json['is_online'] customer_id = customers.insert({'first_name': first_name, 'last_name': last_name, 'date_of_birth': date_of_birth, 'is_new': is_new, 'country': country, 'city': city, 'is_online': is_online}) new_customer = customers.find_one({'_id': customer_id}) output = {'first_name': new_customer['first_name'], 'last_name': new_customer['last_name']} return jsonify({'result': output})
def test_pagination(api, client): from flask_restler import Resource DATA = list(range(1, 100)) @api.connect class TestResource(Resource): class Meta: per_page = 20 def get_many(self, **kwargs): return DATA response = client.get('/api/v1/test') assert len(response.json) == 20 response = client.get('/api/v1/test?page=2') assert len(response.json) == 20 assert response.json[0] == 41
def set_config(): if not request.json: abort(400) try: config['host']: request.json['host'] config['port']: request.json['port'] config['version']: request.json['version'] config['branchID']: request.json['branchID'] config['accountNo']: request.json['accountNo'] config['tradeAccountNo']: request.json['tradeAccountNo'] config['password']: request.json['password'] config['txPassword']: request.json['txPassword'] return jsonify(config), 201 except: return "wrong"
def webhook_callback(): data = request.json if data['object'] == 'page': for page_entry in data['entry']: page_id = page_entry['id'] time_of_event = page_entry['time'] for message_event in page_entry['messaging']: if 'optin' in message_event: LOGGER.info('Webhook received message event: option from page %s at %d', page_id, time_of_event) elif 'message' in message_event: received_message(message_event) elif 'delivery' in message_event: LOGGER.info('Webhook received message event: delivery from page %s at %d', page_id, time_of_event) elif 'postback' in message_event: received_postback(message_event) elif 'read' in message_event: LOGGER.info('Webhook received message event: read from page %s at %d', page_id, time_of_event) elif 'account_linking' in message_event: LOGGER.info('Webhook received message event: account linking from page %s at %d', page_id, time_of_event) else: LOGGER.info('Webhook received unknown message event: %s from page %s at %d', message_event, page_id, time_of_event) return make_response('', 200)
def put(self): """Change the password""" us = User.query \ .filter(User.disabled == 0) \ .filter(User.id_user == g.current_user) \ .first() abort_if_none(us, 404, 'User not found') if not check_password_hash(us.password, request.json['old_password']): return msg('Old password incorrect'), 403 us.password = request.json['password'] db.session.commit() cache.blacklisted_tokens.append(request.headers['Authorization']) return msg('success!')
def support_jsonp(f): """ Wraps JSONified output for JSONP https://gist.github.com/richardchien/7b7c2727feb3e8993845a3ce61bad808 """ @wraps(f) def decorated_function(*args, **kwargs): callback = request.args.get('callback', False) if callback: content = str(callback) + '(' + f(*args, **kwargs).data.decode('utf-8') + ')' return current_app.response_class(content, mimetype='application/json') else: return f(*args, **kwargs) return decorated_function
def get_job(request_id): job = _get_job(request_id) status = job.status run_at = None scheduled_at = None last_retry = None retries_left = 0 if status != "done": args = json.loads(job.args) taskid, fname, args, kwargs, run_at = args scheduled_at = kwargs['scheduled_at'] last_retry = kwargs.get('last_retry') retries_left = kwargs['_attempts'] return jsonify({ 'request_id': request_id, 'status': status, 'run_at': run_at, 'scheduled_at': scheduled_at, 'last_retry': last_retry, 'retries_left': retries_left, })
def _process_request(self, request): self._logger.info('[Ghost2logger]: Received Request') if request.headers['Content-Type'] == 'application/json': payload = request.json if 'pattern' and 'host' and 'hostpattern' and 'message' in payload: self._logger.info(request.json) self._logger.debug('[ghost2logger_sensor]: processing request' ' {}'.format(payload)) self._sensor_service.dispatch(trigger=self._pmatch, payload=payload) return ('ok', 200) else: return ('fail', 415) else: return ('fail', 415)
def api_post_sample_family(sid): samp = api.get_elem_by_type("sample", sid) if request.json is None: abort(400, "JSON not provided") fam = None if "family_id" in request.json.keys(): fid = request.json['family_id'] fam = api.get_elem_by_type("family", fid) elif "family_name" in request.json.keys(): fname = request.json['family_name'] fam = api.familycontrol.get_by_name(fname) else: return jsonify({'result': False}) result = api.familycontrol.add_sample(samp, fam) return jsonify({'result': result})
def api_create_struct_member(sid, struct_id): """ Add a new member to a structure """ result = False data = request.json if data is None: abort(400, "Missing JSON data") name = data["name"] size = data["size"] offset = data["offset"] mid = api.idacontrol.create_struct_member(name=name, size=size, offset=offset) if mid is None: result = False else: result = api.idacontrol.add_member_to_struct(struct_id, mid) return jsonify({'result': result})
def api_create_yara(): """ Add a new yara @arg name: the yara name @arg rule: the full text of the rule @arg tlp_level: Optional, the sensibility of the rule. Default = TLP AMBER """ tlp_level = None data = request.json name = data["name"] rule = data["rule"] if 'tlp_level' in data.keys(): tlp_level = data["tlp_level"] if tlp_level is None: tlp_level = TLPLevel.TLPAMBER result = api.yaracontrol.create(name, rule, tlp_level) if result is None or not result: abort(500, "Cannot create yara rule") return jsonify({"id": result.id})
def check_trigger_rebuild(json): should_rebuild = False branch = 'master' if 'ref' in json and json['ref'] == 'refs/heads/{}'.format(branch): if 'commits' in json: commits = json['commits'] for commit in commits: added = commit['added'] removed = commit['removed'] modified = commit['modified'] for f in added + removed + modified: prefix, suffix = os.path.splitext(f) if suffix == '.top': should_rebuild = True if json['forced']: should_rebuild = True if should_rebuild and test_repo('loving-ai', branch) and update_repo('loving-ai', branch): threading.Thread(target=rebuild_cs_character, kwargs={'revision': revision, 'botname': BOTNAME}).start() return True return False
def __call__(self, form, field): if current_app.testing: return True if request.json: response = request.json.get('g-recaptcha-response', '') else: response = request.form.get('g-recaptcha-response', '') remote_ip = request.remote_addr if not response: raise ValidationError(field.gettext(self.message)) if not self._validate_recaptcha(response, remote_ip): field.recaptcha_error = 'incorrect-captcha-sol' raise ValidationError(field.gettext(self.message))
def new_user(): """ Creates a new user """ if not request.json: abort(400) username = request.json.get('username') password = request.json.get('password') name = request.json.get('name') email = request.json.get('email') if not username or not password or not name or not email: abort(400, 'Username, password, name or email is not given.') if User.query.filter_by(username=username).first() is not None: abort(400, 'Username is already in use.') user = User(username=username) user.hash_password(password) user.name = name user.email = email db.session.add(user) db.session.commit() return (jsonify({'username': user.username}), 201, {'Location': url_for('get_user', id=user.id, _external=True)})
def create_blog_post(): """ Creates a new blog post """ if not request.json: abort(400) title = request.json.get('title') content = request.json.get('content') tags = request.json.get('tags') author = g.user if not title or not content: abort(400) post = BlogPost() post.title = title post.content = content if tags: post.tags = tags post.author = author.id db.session.add(post) db.session.commit() return jsonify({ "status": "Created", "code": 201, "message": "Blog post created!" }), 201
def validate_json(required): def decorator(f): @wraps(f) def wrapper(*args, **kw): try: request.json except Exception: return json_response( "This endpoint requires a json request body", 400 ) for r in required.split(","): if r not in (request.json or {}): log.warning( "Required field not specified: %s, json is %s", r, request.json ) return make_response(jsonify( { "message": "Required field not specified: %s" % r, "status": 500 }), 500) return f(*args, **kw) return wrapper return decorator
def create_new_match(): """Method for creating new match.""" validation = auth.validate(request) if validation['status'] != 'ok': abort(validation['code']) player_id = request.json['player_id'] map_code = request.json['map_code'] side = request.json['side'] if not match_validator.are_slots_available(player_id): abort(403) if not match_validator.is_map_available(map_code): abort(400) if not match_validator.is_side_valid(side): abort(400) new_match_code = match_model.create_new_match(player_id, side, map_code) return jsonify({ 'match_code': new_match_code })
def join_match(match_code): """Method for joining a match.""" validation = auth.validate(request) if validation['status'] != 'ok': abort(validation['code']) player_id = request.json['player_id'] if not match_validator.are_slots_available(player_id): abort(403) if match_validator.is_in_match(player_id, match_code): abort(400) if not match_validator.is_match_joinable(match_code): abort(403) if not match_model.add_player_to_match(player_id, match_code): abort(500) return jsonify({ 'status': 'ok' })
def abandon_match(match_code): """Method for abandoning a match.""" validation = auth.validate(request) if validation['status'] != 'ok': abort(validation['code']) player_id = request.json['player_id'] if not match_validator.is_in_match(player_id, match_code): abort(403) match_model.abandon_match(match_code, player_id) return jsonify({ 'status': 'ok' })
def put(self, plugin_uuid=None): LOG.debug("PUT plugin lifecycle: %r" % plugin_uuid) try: p = model.Plugin.objects.get(uuid=plugin_uuid) # get target state from request body ts = json.loads(request.json).get("target_state") if ts is None: LOG.error("Malformed request: %r" % request.json) return {"message": "malformed request"}, 500 if ts == "start": PM.send_start_notification(p) elif ts == "pause": PM.send_pause_notification(p) elif ts == "stop": PM.send_stop_notification(p) else: return {"message": "Malformed request"}, 500 return {}, 200 except DoesNotExist as e: LOG.error("Lookup error: %r" % plugin_uuid) return {}, 404 # reference to plugin manager
def setup_logging(default_path='logging.json', default_level=logging.INFO, env_key='LOG_CFG'): """Setup logging configuration """ path = default_path value = os.getenv(env_key, None) if value: path = value if os.path.exists(path): with open(path, 'rt') as f: config = json.load(f) logging.config.dictConfig(config) else: logging.basicConfig(level=default_level) socketHandler = logging.handlers.DatagramHandler( 'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT) rootLogger = logging.getLogger('') rootLogger.addHandler(socketHandler)
def post(self, username): get_user = AccountCrawler() get_user.crawl('poj', username, request.json['password']) client = pymongo.MongoClient(config.MONGO_URI) db = client[config.MONGO_DATABASE] user_info = db['users'].find_one({'oj': 'poj', 'username': username}) client.close() if user_info is None: return { 'status': 404, 'message': 'not found' } return { 'username': user_info['username'], 'status': 200, 'submit': user_info['submit'], 'oj': user_info['oj'], 'accept': user_info['accept'], 'rank': user_info['rank'], 'solved': dict(user_info['solved']) }
def send_image_message(recipient_id, message): payload = { 'recipient': { 'id': str(recipient_id) }, 'message': { "attachment": { "type": "image", "payload":{ "url": message } } } } result = requests.post(base_url, json=payload).json() return result
def distinctService(): logger.info('distinct service requested') if not request.json: logger.error('no JSON request sent to distinct') abort(400) if not checkES(): logger.error('elasticsearch not available') return jsonify( {'error':'elasticsearch not available'}), 500 if not checkData([]): logger.error('no data') return jsonify( {'error':'no data available'}), 500 result = distinct.main(request.json) if 'error' in result: logger.error('distinct failed') return jsonify(result), 500 else: return jsonify(result), 200
def _safe_index_json(json_, key, warning_message): """ Safely index the given JSON object :param json_: JSON object :param key: Key to inspect in JSON object :param warning_message: Warning message to log if key is missing :returns: None if not found or value at key """ try: return json_[key] except KeyError: app.logger.warning('%s, json:%s', warning_message, json.dumps(request.json)) return None
def predict(): """ API route for predicting the category of the supplied text. The request should have type set to application/json and the provided JSON should have a text attribute containing the text for which we want to predict the category. """ try: prediction = ct.predict(request.json['document']) return jsonify(category=str(prediction[0]), status=200) except Exception as e: return jsonify(error=True, status=500, message='Getting the prediction failed')
def probabilities(): """ API route for getting the probabilities per category of the supplied text. The request should have type set to application/json and the provided JSON should have a text attribute containing the text for which we want to get the probabilities per category """ try: probabilities = ct.probability_per_category(request.json['document']) return jsonify(probabilities=probabilities, status=200) except: return jsonify(error=True, status=500, message='Getting the probabilities failed')
def handle_incoming_messages(): """ Handles FB verification. """ log.debug('incoming message from Facebook') dispatcher.init() talkback_app = dispatcher.get_default_app() from talkback.backends.messenger import MessengerSession session = None try: data = request.json log.debug('data is: %s' % unicode(data)) print 'messenger data is:',data sender = data['entry'][0]['messaging'][0]['sender']['id'] if 'postback' in data['entry'][0]['messaging'][0]: postback = data['entry'][0]['messaging'][0]['postback']['payload'] process_postback(talkback_app,sender,postback) else: message = data['entry'][0]['messaging'][0]['message']['text'] intent = talkback_app.intent_for(message) with intention(intent): session = Session(MessengerSession(sender)) intent.invoke(session) except KeyError: log.info('Messageless request from user %s' % sender) except Termination: log.info('Session terminated') if session: session.speak('Bye!') return 'ok'
def to_json_response(self, response, headers=None): """Serialize simple response to Flask response.""" if self.raw: return response response = current_app.response_class( dumps(response, indent=2), mimetype='application/json') if headers: response.headers.extend(headers) return response
def post(self, **kwargs): """Create a resource.""" data = request.json or {} resource = self.load(data, **kwargs) resource = self.save(resource) logger.debug('Create a resource (%r)', kwargs) return self.to_simple(resource, **kwargs)
def test_resource(app, api, client): from flask_restler import Resource @api.connect class HelloResource(Resource): def get(self, resource=None, **kwargs): return 'Hello, %s!' % (resource and resource.title() or 'World') @api.connect('/hello/<name>/how-are-you') class HowAreYouResource(Resource): def get(self, resource=None, name=None, **kwargs): return 'Hello, %s! How are you?' % name.title() response = client.get('/api/v1/hello') assert response.json == 'Hello, World!' response = client.get('/api/v1/hello/mike') assert response.json == 'Hello, Mike!' response = client.post('/api/v1/hello') assert response.status_code == 405 response = client.get('/api/v1/hello/mike/how-are-you') assert response.json == 'Hello, Mike! How are you?'
def _post_args(self): # pylint: disable=no-self-use # pylint: disable=maybe-no-member """ Return action and args after parsing request """ data = request.json if request.json else {} params = api_utils.change_to_str_in_dict(data) action = params.get('action', request.form.get('action', '')) args = params.get('args', {}) try: args['file'] = request.files['file'] except KeyError: pass LOGGER.debug('Input args are: action: %s, args: %s', action, args) return action, args
def main(): req = flask_request.json policy = Policy.initialize() return json.dumps(policy.handle(req, settings.vi)).encode('utf-8')
def put(self, id): '''Update an event by ID''' ev = Event.query.filter(Event.disabled == 0).filter(Event.id_event == id).first() abort_if_none(ev, 404, 'Not Found') fill_object(ev, request.json) db.session.commit() return msg('success!')
def post(self): '''Create a new event''' ev = Event() tags = request.json['tags'][:] # copy the tags dict del request.json['tags'] for tm in tags: t = Tag.query\ .filter(Tag.name == tm['name'])\ .filter(Tag.disabled == 0)\ .first() if t is not None: ev.tags.append(t) continue tag = Tag() fill_object(tag, tm) ev.tags.append(tag) fill_object(ev, request.json) # submit objects to db db.session.add(ev) db.session.commit() return msg(ev.id_event, 'id')
def put(self, id): '''Update an event_type by ID''' et = EventType.query.filter(EventType.disabled == 0).filter(EventType.id_event_type == id) et = et.first() abort_if_none(et, 404, 'Not Found') fill_object(et, request.json) db.session.commit() return msg('altered')
def post(self): '''Create a new event_type''' et = EventType(request.json['name'], request.json['description']) et = et.first() abort_if_none(et, 404, 'Not Found') db.session.add(et) db.session.commit() return msg(et.id_event_type, 'id')
def put(self, id): """Update an user by ID""" us = User.query\ .filter(User.disabled == 0)\ .filter(User.id_user == id)\ .first() abort_if_none(us, 404, 'not found') fill_object(us, request.json) db.session.commit() return msg('success!')
def post(self): """Create a new user""" us = User() fill_object(us, request.json) db.session.add(us) db.session.commit() return msg(us.id_user, 'id')
def json_resp(thing): json_str = json.dumps(thing, sort_keys=True, default=json_dumper, indent=4) if request.path.endswith(".json") and (os.getenv("FLASK_DEBUG", False) == "True"): logger.info(u"rendering output through debug_api.html template") resp = make_response(render_template( 'debug_api.html', data=json_str)) resp.mimetype = "text/html" else: resp = make_response(json_str, 200) resp.mimetype = "application/json" return resp
def abort_json(status_code, msg): body_dict = { "HTTP_status_code": status_code, "message": msg, "error": True } resp_string = json.dumps(body_dict, sort_keys=True, indent=4) resp = make_response(resp_string, status_code) resp.mimetype = "application/json" abort(resp)
def log_request(resp): if request.endpoint != "get_doi_endpoint": return logging_start_time = time() try: results = json.loads(resp.get_data())["results"][0] except (ValueError, RuntimeError, KeyError): # don't bother logging if no results return oa_color = results["oa_color"] if not oa_color: oa_color = "gray" body = { "timestamp": datetime.utcnow().isoformat(), "elapsed": elapsed(g.request_start_time, 2), "ip": get_ip(), "status_code": resp.status_code, "email": request.args.get("email", None), "doi": results["doi"], "year": results.get("year", None), "oa_color": oa_color } h = { "content-type": "text/json", "X-Forwarded-For": get_ip() } url = "http://logs-01.loggly.com/inputs/6470410b-1d7f-4cb2-a625-72d8fa867d61/tag/{}/".format( oa_color) requests.post(url, headers=h, data=json.dumps(body)) # logger.info(u"log_request took {} seconds".format(elapsed(logging_start_time, 2)))
def get_doi_endpoint(doi): # the GET api endpoint (returns json data) my_pub = get_pub_from_doi(doi) return jsonify({"results": [my_pub.to_dict()]})
def post_gs_cache_endpoint(): body = request.json my_gs = post_gs_cache(**body) return jsonify(my_gs.to_dict())
def restart_endpoint(): logger.info(u"in restart endpoint") allowed_to_reboot = False for (k, v) in request.args.iteritems(): if v==os.getenv("HEROKU_API_KEY"): allowed_to_reboot = True if not allowed_to_reboot: logger.info(u"not allowed to reboot in restart_endpoint") return jsonify({ "response": "not allowed to reboot, didn't send right heroku api key" }) payload_json = json.loads(request.form["payload"]) dynos_to_restart = set() for event in payload_json["events"]: logger.info(u"dyno {}".format(event["program"])) dyno_name = event["program"].split("/")[1] dynos_to_restart.add(dyno_name) # just restart each dyno once logger.info(u"restarting dynos: {}".format(dynos_to_restart)) for dyno_name in dynos_to_restart: restart_dyno("oadoi", dyno_name) return jsonify({ "response": "restarted dynos: {}".format(dynos_to_restart) })
def webhook(): try: from custom import webhook_handler as h if h.handle: h.handle(request.json or request.form) except ImportError: print('There is no webhook handler.') return '', 204