我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用flask.request.is_json()。
def before_request(): request_param = {key.lower(): value for key, value in request.environ.items() if key in ('CONTENT_TYPE', 'CONTENT_LENGTH', 'HTTP_HOST', 'HTTP_ACCEPT', 'HTTP_ACCEPT_ENCODING', 'HTTP_COOKIE', 'HTTP_USER_AGENT', 'PATH_INFO', 'QUERY_STRING', 'SERVER_PROTOCOL', 'REQUEST_METHOD', 'HTTP_HOST', 'SERVER_PORT', 'SERVER_SOFTWARE', 'REMOTE_ADDR', 'REMOTE_PORT', 'HTTP_ACCEPT_LANGUAGE')} g.request_raw_data = request.get_data().decode('utf8') g.request_time = datetime.now() g.api_method = request.args['method'] g.api_version = request.args['v'] g.request_param = request_param g.request_form = request.form.to_dict() if request.form else None try: g.request_json = request.get_json() if request.is_json else None except Exception: raise ApiSysExceptions.invalid_json
def update_payments(id): try: if not request.is_json: raise DataValidationError('Invalid payment: Content Type is not json') data = request.get_json(silent=True) message = payment_service.update_payment(id,payment_replacement=data) rc = status.HTTP_200_OK except PaymentNotFoundError as e: message = e.message rc = status.HTTP_404_NOT_FOUND except DataValidationError as e: message = e.message rc = status.HTTP_400_BAD_REQUEST return make_response(jsonify(message), rc) ###################################################################### # UPDATE AN EXISTING PAYMENT PARTIALLY ######################################################################
def update_partial_payments(id): try: if not request.is_json: raise DataValidationError('Invalid payment: Content Type is not json') data = request.get_json(silent=True) message = payment_service.update_payment(id,payment_attributes=data) rc = status.HTTP_200_OK except PaymentNotFoundError as e: message = e.message rc = status.HTTP_404_NOT_FOUND except DataValidationError as e: message = e.message rc = status.HTTP_400_BAD_REQUEST return make_response(jsonify(message), rc) ###################################################################### # DELETE A PAYMENT ######################################################################
def charge_payment(user_id): try: if not request.data: raise DataValidationError('Invalid request: body of request contained no data') if not request.is_json: raise DataValidationError('Invalid request: request not json') data = request.get_json() if data['amount']: if(data['amount'] < 0): raise DataValidationError('Invalid request: Order amount is negative.') else: resp = payment_service.perform_payment_action(user_id,payment_attributes=data) if resp == True: message = {'success' : 'Default payment method for user_id: %s has been charged $%.2f' % (str(user_id), data['amount'])} rc = status.HTTP_200_OK except DataValidationError as e: message = {'error' : e.message} rc = status.HTTP_400_BAD_REQUEST except KeyError as e: message = {'error' : 'Invalid request: body of request does not have the amount to be charged'} rc = status.HTTP_400_BAD_REQUEST return make_response(jsonify(message), rc)
def get_json_arg(name, parser=None, validator=None): jdata = request.get_json(force=True, silent=True) if not request.is_json: raise APIError(ret=1, msg='????????') if jdata is None: raise APIError(ret=1, msg='????????') val = jdata.get(name, None) if val is None: raise APIError(ret=1, msg='????:{}'.format(name)) if parser and callable(parser): try: val = parser(val) except Exception as e: raise APIError(ret=1, msg='????:{}??'.format(name)) if validator and callable(validator): if not validator(val): raise APIError(ret=1, msg='??:{}???'.format(name)) return val
def get_json_arg_default(name, default=None, parser=None, validator=None): jdata = request.get_json(force=True, silent=True) if not request.is_json: raise APIError(ret=1, msg='????????') if jdata is None: raise APIError(ret=1, msg='????????') val = jdata.get(name, None) if val is None: if default is not None: return default raise APIError(ret=1, msg='????:{}'.format(name)) if parser and callable(parser): try: val = parser(val) except Exception as e: raise APIError(ret=1, msg='????:{}??'.format(name)) if validator and callable(validator): if not validator(val): raise APIError(ret=1, msg='??:{}???'.format(name)) return val
def save_topology(name): """Save a topology layout in a file. This method get a json topology from request and puts this in a file. Parameters: name (string): name of the topology to be saved or loaded. Returns: topology (string): topology using json format. """ if not request.is_json: return json.dumps('{"error": "gt was not a JSON request"}'), 400 topology = request.get_json() with open(join(settings.TOPOLOGY_DIR, name + '.json'), 'w') as outfile: json.dump(topology, outfile) return json.dumps({'response': 'Saved'}), 201
def login(): if not request.is_json: return jsonify({"msg": "Missing JSON in request"}), 400 username = request.json.get('username', None) password = request.json.get('password', None) if not username: return jsonify({"msg": "Missing username parameter"}), 400 if not password: return jsonify({"msg": "Missing password parameter"}), 400 if username != 'test' or password != 'test': return jsonify({"msg": "Bad username or password"}), 401 # Identity can be any data that is json serializable access_token = create_access_token(identity=username) return jsonify(access_token=access_token), 200 # Protect a view with jwt_required, which requires a valid access token # in the request to access.
def login(): """Authenticate user and return token """ if not request.is_json: return jsonify({"msg": "Missing JSON in request"}), 400 username = request.json.get('username', None) password = request.json.get('password', None) if not username or not password: return jsonify({"msg": "Missing username or password"}), 400 user = User.query.filter_by(username=username).first() if user is None or not pwd_context.verify(password, user.password): return jsonify({"msg": "Bad credentials"}), 400 access_token = create_access_token(identity=user.id) refresh_token = create_refresh_token(identity=user.id) ret = { 'access_token': access_token, 'refresh_token': refresh_token } return jsonify(ret), 200
def process_image(): try: assert request.is_json assert 'imageUrl' in request.json image_url = request.json['imageUrl'] trusted_source_url, trusted_source_image = None, None trusted_data = check_url(image_url) if trusted_data is not None and 'image_url' in trusted_data: trusted_source_image = trusted_data.image_url trusted_source_url = trusted_data.url result = { 'status': STATUS_OK, 'trusted_source_url': trusted_source_url, 'trusted_source_image': trusted_source_image, 'source_url': image_url } return jsonify(result) except AssertionError: return make_response(jsonify({'status': STATUS_ERROR, 'message': 'malformed request'}), 400)
def set_default(user_id): try: if not request.data: raise DataValidationError('Invalid request: body of request contained no data') if not request.is_json: raise DataValidationError('Invalid request: request not json') data = request.get_json() if data['payment_id']: resp = payment_service.perform_payment_action(user_id,payment_attributes=data) if resp == True: message = { 'success' : 'Payment with id: %s set as default for user with user_id: %s.' % (data['payment_id'], str(user_id)) } rc = status.HTTP_200_OK else: message = { 'error' : 'No Payment with id: %s was found for user with user_id: %s.' % (data['payment_id'], str(user_id)) } rc = status.HTTP_404_NOT_FOUND except DataValidationError as e: message = {'error' : e.message} rc = status.HTTP_400_BAD_REQUEST except KeyError as e: message = {'error' : 'Invalid request: body of request does not have the payment_id'} rc = status.HTTP_400_BAD_REQUEST return make_response(jsonify(message), rc) ###################################################################### # RETRIEVE A PAYMENT ######################################################################
def get_http_info(self): """ Determine how to retrieve actual data, basically if it's JSON or not. """ try: is_json = request.is_json except AttributeError: is_json = request.get_json(silent=True) is not None if is_json: retriever = Breathalyzer.get_json_data else: retriever = Breathalyzer.get_form_data return self.get_http_info_with_retriever(retriever)
def is_json(f): @wraps(f) def decorated_function(*args, **kwargs): if not request.is_json: return jsonify(error=True, status=400, message='Content-Type should be application/json') return f(*args, **kwargs) return decorated_function
def request_from_github(abort_code=418): def decorator(f): """ Decorator that checks if a request is a GitHub hook request """ @wraps(f) def decorated_function(*args, **kwargs): if request.method != 'POST': return 'OK' else: # Do initial validations on required headers if 'X-Github-Event' not in request.headers: abort(abort_code) if 'X-Github-Delivery' not in request.headers: abort(abort_code) if 'X-Hub-Signature' not in request.headers: abort(abort_code) if not request.is_json: abort(abort_code) if 'User-Agent' not in request.headers: abort(abort_code) ua = request.headers.get('User-Agent') if not ua.startswith('GitHub-Hookshot/'): abort(abort_code) request_ip = ip_address(u'{0}'.format(request.remote_addr)) meta_json = requests.get('https://api.github.com/meta').json() hook_blocks = meta_json['hooks'] # Check if the POST request is from GitHub for block in hook_blocks: if ip_address(request_ip) in ip_network(block): break else: g.log.info("Unauthorized attempt to deploy by IP %s" % request_ip) abort(abort_code) return f(*args, **kwargs) return decorated_function return decorator
def with_json_request(func): @wraps(func) def dec(*args, **kwargs): if not request.is_json: return 'Bad json request', 400 return func(*args, **kwargs) return dec
def get_items(slugs): if slugs: items_list = slugs.split(',') elif request.is_json: items_list = request.get_json() # Check if there are items from ugc collection and test their access control ugc_items = [] for item in items_list: if item.startswith('ugc'): ugc_items.append(item) user_oid = current_user.is_authenticated and current_user.id items = fetch_items(items_list) if len(items) == 1 and 'error_code' in items[0]: error = items[0] abort(error['error_code'], error['msg']) else: # Cast items to list if type(items) != list: items = [items] # Check that each of the ugc_items is accessible by the logged in user for ugc_item_id in [i[4:] for i in ugc_items]: for item in items: if item['_id'] == ugc_item_id and item.has_key('owner') and item['owner'] != unicode(user_oid): abort(403, 'You are not authorized to access item ugc.{}'.format(str(item['_id']))) return humanify(items)
def test_instance_attachment_collection_service(test_id): resp = flask.Response(json.dumps({'status': 'failed'})) if request.method == 'POST': if request.is_xhr: file = request.files['attachments'] if file and allowed_file(file.filename): filename = secure_filename(file.filename) dirpath = os.path.join(UPLOAD_FOLDER, str(test_id), 'attachments') os.makedirs(dirpath, exist_ok=True) filepath = os.path.join(dirpath, filename) file.save(filepath) # db: update the attachment for the test where test.id = test_id test = Test.query.get(test_id) test.test_attachment.append(TestAttachment(name=filename, attachment_url=filepath)) db.session.commit() resp = flask.Response(json.dumps({'status': 'success', 'url': filepath})) elif request.method == 'DELETE': if request.is_json: filename = secure_filename(request.json['removedFile']) dirpath = os.path.join(UPLOAD_FOLDER, str(test_id), 'attachments') filepath = os.path.join(dirpath, filename) try: os.remove(filepath) # db: delete the attachment for the test where test.id = test_id TestAttachment.query.filter( (TestAttachment.test_id == test_id) & (TestAttachment.name == filename) ).delete() db.session.commit() resp = flask.Response(json.dumps({'status': 'success', 'url': filepath})) except FileNotFoundError: print('FileNotFound: ', filepath) resp = flask.Response(json.dumps({'status': 'failed', 'url': filepath})) return set_debug_response_header(resp)
def formulation_instance_service(f_id): resp = flask.Response(json.dumps({'status': 'failed'})) if request.method == 'PUT': if request.is_json: p_json_list = request.json['properties'] formulation = Formulation.query.get(f_id) formulation.formulation_property.delete() # fp_rs = Formulation.query.get(f_id).formulation_property # FormulationProperty.query.filter(FormulationProperty.formulation_id == f_id).delete() for p in p_json_list: formulation.formulation_property.append(FormulationProperty( key=p['keyName'], value=p['valueName'] )) db.session.commit() p_list = [] for p in formulation.formulation_property: p_list.append({p.key: p.value}) resp = flask.Response(json.dumps({'status': 'success', 'formulation_id': formulation.id, 'formulation_properties': p_list})) elif request.method == 'DELETE': formulation = Formulation.query.get(f_id) test_count = formulation.test.count() if test_count > 0: resp = flask.Response(json.dumps({'status': 'failed', 'error': 'the tests count of formulation id %d is not 0, ' 'delete tests first'})) else: Formulation.query.filter(Formulation.id == f_id).delete() db.session.commit() return set_debug_response_header(resp)
def process_text(): try: assert request.is_json tweet = request.json['text'] if 'text' in request.json else None url = request.json['pageUrl'] if 'pageUrl' in request.json else None share, total_engaged, resource_trust = None, None, None if url: comments, reaction, share, total_engaged = check_virality(url) resource_trust = check_info_source(url) analysis_result = None if tweet: tweet = tweet if len(tweet) < 1000 else tweet[:1000] analysis_result = compare_tweet_with_storage(tweet) result = { 'status': STATUS_OK, 'data': { 'credibility': analysis_result, 'engaged': total_engaged, 'shares': share, 'site_credibility': resource_trust }, 'source_text': tweet } return jsonify(result) except AssertionError: return make_response(jsonify({'status': STATUS_ERROR, 'message': 'malformed request'}), 400) # except: # return make_response(jsonify({'status': STATUS_ERROR, 'message': 'oops...'}), 500)
def json_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not request.is_json: abort(400, "JSON Required") return f(*args, **kwargs) return decorated_function
def post(self, src_vnf, src_intfs, dst_vnf, dst_intfs): """ A post request to "/v1/chain/<src_vnf>/<src_intfs>/<dst_vnf>/<dst_intfs>" will create a chain between two interfaces at the specified vnfs. The POST data contains the path like this. { "path": ["dc1.s1", "s1", "dc4.s1"]} path specifies the destination vnf and interface and contains a list of switches that the path traverses. The path may not contain single hop loops like: [s1, s2, s1]. This is a limitation of Ryu, as Ryu does not allow the `INPUT_PORT` action! :param src_vnf: Name of the source VNF :type src_vnf: ``str`` :param src_intfs: Name of the source VNF interface to chain on :type src_intfs: ``str`` :param dst_vnf: Name of the destination VNF :type dst_vnf: ``str`` :param dst_intfs: Name of the destination VNF interface to chain on :type dst_intfs: ``str`` :return: flask.Response 200 if set up correctly else 500 also returns the cookie as dict {'cookie': value} 501 if one of the VNF / intfs does not exist :rtype: :class:`flask.Response` """ if request.is_json: path = request.json.get('path') layer2 = request.json.get('layer2', True) else: path = None layer2 = True # check if both VNFs exist if not self.api.manage.check_vnf_intf_pair(src_vnf, src_intfs): return Response(u"VNF %s or intfs %s does not exist" % (src_vnf, src_intfs), status=501, mimetype="application/json") if not self.api.manage.check_vnf_intf_pair(dst_vnf, dst_intfs): return Response(u"VNF %s or intfs %s does not exist" % (dst_vnf, dst_intfs), status=501, mimetype="application/json") try: cookie = self.api.manage.network_action_start(src_vnf, dst_vnf, vnf_src_interface=src_intfs, vnf_dst_interface=dst_intfs, bidirectional=True, path=path, layer2=layer2) resp = {'cookie': cookie} return Response(json.dumps(resp), status=200, mimetype="application/json") except Exception as e: logging.exception(u"%s: Error setting up the chain.\n %s" % (__name__, e)) return Response(u"Error setting up the chain", status=500, mimetype="application/json")
def post(self, src_dc, src_stack, src_vnf, src_intfs, dst_dc, dst_stack, dst_vnf, dst_intfs): """ A post request to "/v1/chain/<src_dc>/<src_stack>/<src_vnf>/<src_intfs>/<dst_dc>/<dst_stack>/<dst_vnf>/<dst_intfs>" will create a chain between two interfaces at the specified vnfs. The POST data contains the path like this. { "path": ["dc1.s1", "s1", "dc4.s1"]} path specifies the destination vnf and interface and contains a list of switches that the path traverses. The path may not contain single hop loops like: [s1, s2, s1]. This is a limitation of Ryu, as Ryu does not allow the `INPUT_PORT` action! :param src_vnf: Name of the source VNF :type src_vnf: ``str`` :param src_intfs: Name of the source VNF interface to chain on :type src_intfs: ``str`` :param dst_vnf: Name of the destination VNF :type dst_vnf: ``str`` :param dst_intfs: Name of the destination VNF interface to chain on :type dst_intfs: ``str`` :return: flask.Response 200 if set up correctly else 500 also returns the cookie as dict {'cookie': value} 501 if vnf / intfs do not exist :rtype: :class:`flask.Response` """ if request.is_json: path = request.json.get('path') layer2 = request.json.get('layer2', True) else: path = None layer2 = True # search for real names real_names = self._findNames(src_dc, src_stack, src_vnf, src_intfs, dst_dc, dst_stack, dst_vnf, dst_intfs) if type(real_names) is not tuple: # something went wrong return real_names container_src, container_dst, interface_src, interface_dst = real_names try: cookie = self.api.manage.network_action_start(container_src, container_dst, vnf_src_interface=interface_src, vnf_dst_interface=interface_dst, bidirectional=True, path=path, layer2=layer2) resp = {'cookie': cookie} return Response(json.dumps(resp), status=200, mimetype="application/json") except Exception as e: logging.exception(u"%s: Error setting up the chain.\n %s" % (__name__, e)) return Response(u"Error setting up the chain", status=500, mimetype="application/json")
def start_trex(): if request.method == 'POST': if request.is_json: req_data = request.get_json(cache=False) if req_data is not None: try: traffic_config = { "pps": int(req_data['input']['pps'].encode("ascii")), "src_n": int(req_data['input']['src_n'].encode("ascii")), "pkts_n": int(req_data['input']['pkts_n'].encode("ascii")), "mac_dest": req_data['input']['mac_dest'].encode("ascii"), "packet_size": int(req_data['input']['packet_size'].encode("ascii")), "mult": int(req_data['input']['mult'].encode("ascii")), } if traffic_config["pps"] > 0 and traffic_config["mac_dest"]: """If # of PPS or MAC addresses is positive""" if not Trex.is_running(): try: start_traffic(traffic_config) except: import traceback print "exception", traceback.format_exc() return responsify('error', get_error_message('trex_not_start')) return responsify('ok', 'start') else: return responsify('error', get_error_message('trex_already_running')) else: """Stop TRex if 0 PPS or MAC addresses received""" stop_trex(True) return responsify('error', get_error_message('pps_must_be_positive')) except (AttributeError, KeyError): return responsify('error', get_error_message('not_json')) except ValueError: return responsify('error', get_error_message('ascii_error')) else: return responsify('error', get_error_message('not_json')) else: return responsify('error', get_error_message('not_json')) else: return responsify("ok", "ok") # Stop sending traffic
def test_collection_service(): resp = flask.Response(json.dumps({'status': 'failed'})) if request.method == 'GET': if request.args.get('formulationID'): formulation_id = request.args['formulationID'] test_list = [] test_rs = Formulation.query.get(formulation_id).test for test_r in test_rs: test_list.append({ 'id': test_r.id, 'name': test_r.name, 'measure_type': test_r.measure_type, 'thickness': test_r.thickness, 'temperature_max': test_r.temperature_max, 'temperature_min': test_r.temperature_min, 'frequency_max': test_r.frequency_max, 'frequency_min': test_r.frequency_min, 'test_type': test_r.test_type, 'data_file_url': test_r.data_file_url, 'date': test_r.date.timestamp(), 'formulation_id': test_r.formulation_id, }) resp = flask.Response(json.dumps({'status': 'success', 'test_list': test_list})) elif request.method == 'POST': if request.is_json: temperature, temperature_min, temperature_max = 0, 0, 0 frequency, frequency_min, frequency_max = 0, 0, 0 measure_type = request.json['measureType'] date_ts = request.json.pop('date', datetime.now().timestamp()) if measure_type == 'temperature': frequency_min = frequency_max = request.json['frequencyMin'] temperature_min = request.json['temperatureMin'] temperature_max = request.json['temperatureMax'] elif measure_type == 'frequency': temperature_min = temperature_max = request.json['temperatureMin'] frequency_min = request.json['frequencyMin'] frequency_max = request.json['frequencyMax'] test = Test(name=request.json['name'], measure_type=measure_type, thickness=request.json['thickness'], temperature_max=temperature_max, temperature_min=temperature_min, frequency_max=frequency_max, frequency_min=frequency_min, test_type=request.json['testType'], formulation_id=request.json['selectedFormulationID'], date=datetime.fromtimestamp(date_ts)) db.session.add(test) db.session.commit() resp = flask.Response(json.dumps({'status': 'success', 'test_id': test.id, 'test_name': test.name})) return set_debug_response_header(resp)
def formulation_collection_service(): resp = flask.Response(json.dumps({'status': 'failed'})) if request.method == 'GET': formulations = [] formulations_rs = Formulation.query.all() for formulation_r in formulations_rs: formulation_properties = [] formulations_property_rs = formulation_r.formulation_property.all() for formulations_property_r in formulations_property_rs: formulation_properties.append({formulations_property_r.key: formulations_property_r.value}) formulations.append({ 'id': formulation_r.id, 'name': formulation_r.name, 'date': formulation_r.date.timestamp(), 'formulation_properties': formulation_properties }) resp = flask.Response(json.dumps({'status': 'success', 'formulations': formulations})) elif request.method == 'POST': if request.is_json: req_json = request.json property_list, property_key_list = [], [] name = req_json.pop('formulationName', 'formulation-%f' % datetime.now().timestamp()) date_ts = req_json.pop('formulationDate', datetime.now().timestamp()) property_key_list = [x for x in req_json if x.startswith('key-')] property_key_list.sort(key=lambda x: int(x.replace('key-', ''))) for fpkey in property_key_list: key = req_json[fpkey] val = req_json['value-%s' % fpkey.split('-', maxsplit=1)[1]] property_list.append((key, val)) # create new formulation in db formulation = Formulation(name=name, date=datetime.fromtimestamp(date_ts)) db.session.add(formulation) db.session.commit() for fpkey, fpval in property_list: formulation.formulation_property.append(FormulationProperty(key=fpkey, value=fpval)) db.session.commit() resp = flask.Response(json.dumps({'status': 'success', 'new_formulation_id': formulation.id, 'new_formulation_name': formulation.name})) return set_debug_response_header(resp)