我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用flask.request.content_type()。
def post(current_user, bucket_id): """ Storing an item into a Bucket :param current_user: User :param bucket_id: Bucket Id :return: Http Response """ if not request.content_type == 'application/json': return response('failed', 'Content-type must be application/json', 401) data = request.get_json() item_name = data.get('name') if not item_name: return response('failed', 'No name or value attribute found', 401) # Get the user Bucket bucket = get_user_bucket(current_user, bucket_id) if bucket is None: return response('failed', 'User has no Bucket with Id ' + bucket_id, 202) # Save the Bucket Item into the Database item = BucketItem(item_name.lower(), data.get('description', None), bucket.id) item.save() return response_with_bucket_item('success', item, 200)
def validate_jpeg_binary(func): """ checks the mimetype and the binary data to ensure it's a JPEG """ @wraps(func) def wrapper(*args, **kwargs): if request.content_type != "image/jpeg": return ErrorResponseJson("invalid content type: {}".format(request.content_type)).make_response() if imghdr.test_jpeg(request.data, None) != "jpeg": return ErrorResponseJson("invalid jpeg data").make_response() return func(*args, **kwargs) return wrapper
def builtin_jq(): """ Builtin program: `jq`. It will run a `jq` progress and return a json object. """ program = request.args.get('program', ".") command = request.data try: data = jq(program, command) resp = make_response(data) resp.content_type = 'application/json' return resp except InvalidJQFilter as exception: return jsonify(message=str(exception)), 400
def reset_password(current_user): if request.content_type == "application/json": data = request.get_json() old_password = data.get('oldPassword') new_password = data.get('newPassword') password_confirmation = data.get('passwordConfirmation') if not old_password or not new_password or not password_confirmation: return response('failed', "Missing required attributes", 400) if bcrypt.check_password_hash(current_user.password, old_password.encode('utf-8')): if not new_password == password_confirmation: return response('failed', 'New Passwords do not match', 400) if not len(new_password) > 4: return response('failed', 'New password should be greater than four characters long', 400) current_user.reset_password(new_password) return response('success', 'Password reset successfully', 200) return response('failed', "Incorrect password", 401) return response('failed', 'Content type must be json', 400) # Register classes as views
def edit_bucket(current_user, bucket_id): """ Validate the bucket Id. Also check for the name attribute in the json payload. If the name exists update the bucket with the new name. :param current_user: Current User :param bucket_id: Bucket Id :return: Http Json response """ if request.content_type == 'application/json': data = request.get_json() name = data.get('name') if name: try: int(bucket_id) except ValueError: return response('failed', 'Please provide a valid Bucket Id', 400) user_bucket = User.get_by_id(current_user.id).buckets.filter_by(id=bucket_id).first() if user_bucket: user_bucket.update(name) return response_for_created_bucket(user_bucket, 201) return response('failed', 'The Bucket with Id ' + bucket_id + ' does not exist', 404) return response('failed', 'No attribute or value was specified, nothing was changed', 400) return response('failed', 'Content-type must be json', 202)
def builtin_echo(): """ Builtin program: `echo`. It will response form data. """ resp = make_response(request.data) resp.content_type = request.content_type return resp
def report(node): if not node.maintenance_mode: try: node.active_config_version = int(request.args.get('version')) except (ValueError, TypeError): return abort(400) if request.content_type != 'application/json': return abort(400) provision = models.Provision() provision.node = node provision.config_version = node.active_config_version provision.ignition_config = request.data if node.target_config_version == node.active_config_version: provision.ipxe_config = config_renderer.ipxe.render(node, request.url_root) models.db.session.add(provision) models.db.session.add(node) node.disks.update({ models.Disk.wipe_next_boot: False }) if node.cluster.are_etcd_nodes_configured: node.cluster.assert_etcd_cluster_exists = True models.db.session.add(node.cluster) models.db.session.commit() return Response('ok', mimetype='application/json')
def image(): # json = request.get_json() print('contentType', request.content_type) # length = len(json['binaryData']) print('data length', len(request.data)) resultImageBuffer = process_image(request.data) # resultJsonString = JSONEncoder().encode({'resultImageBuffer': resultImageBuffer}) return resultImageBuffer
def default_after_request(param): response_param = {'charset': param.charset, 'content_length': param.content_length, 'content_type': param.content_type, 'content_encoding': param.content_encoding, 'mimetype': param.mimetype, 'response': g.result if hasattr(g, 'result') else None, 'status': param.status, 'status_code': param.status_code} g.response_time = datetime.now() time_consuming = str(g.response_time - g.request_time) log_info = {'api_method': g.get('api_method'), 'api_version': g.get('api_version'), 'request_param': g.get('request_param'), 'request_form': g.get('request_form'), 'querystring': g.get('request_param')['query_string'], 'request_json': g.get('request_json'), 'response_param': response_param, 'request_raw_data': g.request_raw_data, 'request_time': g.get('request_time').strftime(current_app.config['APIZEN_DATETIME_FMT']), 'response_time': g.get('response_time').strftime(current_app.config['APIZEN_DATETIME_FMT']), 'time_consuming': time_consuming} if param.status_code >= 400: current_app.logger.error(log_info) else: current_app.logger.debug(log_info) return param # ??????
def edit_item(current_user, bucket_id, item_id): """ Edit an item with a valid Id. The request content-type must be json and also the Bucket in which the item belongs must be among the user`s Buckets. The name of the item must be present in the payload but the description is optional. :param current_user: User :param bucket_id: Bucket Id :param item_id: Item Id :return: Response of Edit Item """ if not request.content_type == 'application/json': return response('failed', 'Content-type must be application/json', 401) try: int(item_id) except ValueError: return response('failed', 'Provide a valid item Id', 202) # Get the user Bucket bucket = get_user_bucket(current_user, bucket_id) if bucket is None: return response('failed', 'User has no Bucket with Id ' + bucket_id, 202) # Get the item item = bucket.items.filter_by(id=item_id).first() if not item: abort(404) # Check for Json data request_json_data = request.get_json() item_new_name = request_json_data.get('name') item_new_description = request_json_data.get('description', None) if not request_json_data: return response('failed', 'No attributes specified in the request', 401) if not item_new_name: return response('failed', 'No name or value attribute found', 401) # Update the item record item.update(item_new_name, item_new_description) return response_with_bucket_item('success', item, 200)
def client_pushed(): if request.content_type == 'application/json': data = request.get_json(silent=True) else: data = request.form.to_dict() if not data: logger.error("Invalid scan request from: %r", request.remote_addr) abort(400) logger.debug("Client %r request dump:\n%s", request.remote_addr, json.dumps(data, indent=4, sort_keys=True)) if ('eventType' in data and data['eventType'] == 'Test') or ('EventType' in data and data['EventType'] == 'Test'): logger.info("Client %r made a test request, event: '%s'", request.remote_addr, 'Test') elif 'eventType' in data and data['eventType'] == 'Manual': logger.info("Client %r made a manual scan request for: '%s'", request.remote_addr, data['filepath']) final_path = utils.map_pushed_path(conf.configs, data['filepath']) # ignore this request? ignore, ignore_match = utils.should_ignore(final_path, conf.configs) if ignore: logger.info("Ignored scan request for '%s' because '%s' was matched from SERVER_IGNORE_LIST", final_path, ignore_match) return "Ignoring scan request because %s was matched from your SERVER_IGNORE_LIST" % ignore_match if start_scan(final_path, 'manual', 'Manual'): return "'%s' was added to scan backlog." % final_path else: return "Error adding '%s' to scan backlog." % data['filepath'] elif 'Movie' in data: logger.info("Client %r scan request for movie: '%s', event: '%s'", request.remote_addr, data['Movie']['FilePath'], data['EventType']) final_path = utils.map_pushed_path(conf.configs, data['Movie']['FilePath']) start_scan(final_path, 'radarr', data['EventType']) elif 'movie' in data: # new radarr webhook path = os.path.join(data['movie']['folderPath'], data['movieFile']['relativePath']) logger.info("Client %r scan request for movie: '%s', event: '%s'", request.remote_addr, path, "Upgrade" if data['isUpgrade'] else data['eventType']) final_path = utils.map_pushed_path(conf.configs, path) start_scan(final_path, 'radarr', "Upgrade" if data['isUpgrade'] else data['eventType']) elif 'Series' in data: logger.info("Client %r scan request for series: '%s', event: '%s'", request.remote_addr, data['Series']['Path'], data['EventType']) final_path = utils.map_pushed_path(conf.configs, data['Series']['Path']) start_scan(final_path, 'sonarr', data['EventType']) elif 'series' and 'episodeFile' in data: # new sonarr webhook path = os.path.join(data['series']['path'], data['episodeFile']['relativePath']) logger.info("Client %r scan request for series: '%s', event: '%s'", request.remote_addr, path, "Upgrade" if data['isUpgrade'] else data['eventType']) final_path = utils.map_pushed_path(conf.configs, path) start_scan(final_path, 'sonarr_dev', "Upgrade" if data['isUpgrade'] else data['eventType']) else: logger.error("Unknown scan request from: %r", request.remote_addr) abort(400) return "OK" ############################################################ # MAIN ############################################################