我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用flask_restful.Resource()。
def post(self): args = self.reqparse.parse_args() if parse_handler==None: return {"result":"fail", "reason":"Please initialize the model first!"}, 400 #parse = parser.SyntaxnetParser(segmenter_model,parser_model,folder=args['syntax_folder']) try: return parse_handler.parse_multi_string(args['strings'],tree=args['tree']) except Exception as e: return {'result': 'fail', "reason": str(e)}, 400 # class SyntaxModelQuery(Resource): # def __init__(self): # self.reqparse = reqparse.RequestParser() # self.reqparse.add_argument('strings', required=True, type=list, help='string is required field, you should input a list of strings', location='json') # self.reqparse.add_argument('syntax_folder', required=False, type=str, default=config.syntaxnetFolder, # location='json') # super(SyntaxModelQuery, self).__init__() # # def post(self,folder): # args = self.reqparse.parse_args() # parse = parser.SyntaxnetParser(folder=args['syntax_folder']) # try: # return parse.parse_multi_string_custom(args['strings'],folder=folder) # except Exception, e: # return {'result': 'fail', "reason": e}, 400
def add_cache_timestamp(resource_method): """Resource method decorator which adds a timestamp to the JSON response. Since responses are cached the timestamp will effectively indicate the time/version of the cached response. """ @wraps(resource_method) def wrapped_resource_method(*args, **kwargs): cache_timestamp = { 'cacheTimestamp': datetime.utcnow().isoformat() + 'Z', } response = resource_method(*args, **kwargs) try: response['meta'].update(cache_timestamp) except KeyError: response['meta'] = cache_timestamp return response return wrapped_resource_method
def get(self, slug=None): if slug: obj = self.resource.model.query.filter(self.resource.model.id == slug) obj = self.resource.has_read_permission(obj).first() if obj: return make_response(jsonify(self.resource.schema(exclude=tuple(self.resource.obj_exclude), only=tuple(self.resource.obj_only)).dump( obj, many=False).data), 200) return make_response(jsonify({'error': True, 'message': 'Resource not found'}), 404) else: objects = self.resource.apply_filters(queryset=self.resource.model.query, **request.args) objects = self.resource.has_read_permission(objects) if '__order_by' in request.args: objects = self.resource.apply_ordering(objects, request.args['__order_by']) resources = objects.paginate(page=self.resource.page, per_page=self.resource.limit) if resources.items: return make_response(jsonify({'success': True, 'data': self.resource.schema(exclude=tuple(self.resource.obj_exclude), only=tuple(self.resource.obj_only)) .dump(resources.items, many=True).data, 'total': resources.total}), 200) return make_response(jsonify({'error': True, 'message': 'No Resource Found'}), 404)
def auth2Token(code): url = "https://accounts.spotify.com/api/token" grant_type = "authorization_code" #get code from UI redirect_uri = "http://localhost:8000/create.html" #redirect_uri = "http:%2F%2Flocalhost:8000%2Fcreate.html" client_id = "0abc049d139f4ee8b465fd9416aa358d" client_secret = "dd477b353e744461ae1b3062f256c952" payload = {'grant_type': grant_type, 'code': code, 'redirect_uri': redirect_uri, 'client_id':client_id, 'client_secret':client_secret} req = requests.post(url, data = payload) res = json.loads (req.content) return [res['access_token'], res['refresh_token']] #class CreateUser(Resource):
def CreateUser(currentEvent, inEvent, isHost): try: # Parse the arguments ''' parser = reqparse.RequestParser() parser.add_argument('currentEvent', type=str) parser.add_argument('inEvent', type=str) parser.add_argument('host', type=str) args = parser.parse_args() ''' db = Database() userID = db.insertUser(currentEvent, inEvent, isHost) return userID except Exception as e: return str(e) #class CreateHost(Resource):
def init_resource_manager(): """ Initializes the resource manager """ global resource_manager global REST_BASE global TRAYS global SPEC resource_manager = ResourceManager(REST_BASE, SPEC,MODE,TRAYS) # If POPULATE is specified in emulator-config.json, INFRAGEN is called to populate emulator (i.e. with Chassi, CS, Resource Blocks, etc) according to specified file try: POPULATE except: pass else: if os.path.exists(POPULATE): with open(POPULATE, 'r') as f: infragen_config = json.load(f) populate(infragen_config.get('POPULATE',10)) resource_dictionary = ResourceDictionary()
def get_configuration(obj, path): """ Helper function to follow the given path starting with the given object Arguments: obj - Beginning object to start searching down. obj should have a get_resource() path - Path of object to get """ try: config = obj.get_resource(path) except (IndexError, AttributeError, TypeError, AssertionError, KeyError) as e: traceback.print_exc() raise PathError("Resource not found: " + str(e.message)) print (config) return config # # If DELETE /redfish/v1/reset, then reset the resource manager #
def post(self): resp = INTERNAL_ERROR req = request.get_json() if req is not None: composed_system = CreateComposedSystem(req) resp = composed_system, 200 else: resp = INTERNAL_ERROR return resp #class ComposedSystem(Resource): # def __init__(self): # pass
def get_configuration(obj, path): """ Helper function to follow the given path starting with the given object Arguments: obj - Beginning object to start searching down. obj should have a get_resource() path - Path of object to get """ try: config = obj.get_resource(path) except (IndexError, AttributeError, TypeError, AssertionError, KeyError) as e: traceback.print_exc() raise PathError("Resource not found: " + str(e.message)) return config # # If DELETE /redfish/v1/reset, then reset the resource manager #
def __init__(self): super(Resource, self).__init__()
def __init__(self): super(Resource, self).__init__() self.campaign_table = DynamoTable('campaigns') self.donation_table = DynamoTable('donations')
def __init__(self): super(Resource, self).__init__() self.table = DynamoTable('campaigns')
def __init__(self): super(Resource, self).__init__() self.campaign_table = DynamoTable('campaigns')
def put(volume_id): """Edits a volume dict based on the given representation. Expects to receive a json payload with a complete new version of the volume to be edited Args: volume_id (str): The id parsed from the URL Returns: tuple: payload, http status code, headers """ manager = app.volume_manager volume = manager.by_id(volume_id) if volume is None: return {'message': 'Not Found'}, 404 if volume.value['state'] != 'ready': return {'message': 'Resource not in ready state, can\'t update.'}, 409 new_volume, errors = VolumeAttributeSchema().load(request.get_json(force=True)) if errors: return {'message': errors}, 400 if volume.value['requested'] == new_volume: return '', 304 volume.value['requested'] = new_volume volume.value['state'] = 'pending' volume = manager.update(volume) if not volume: return {'message': 'Resource changed during transition.'}, 409 result, _ = VolumeSchema().dump(volume) return result, 202, {'Location': app.api.url_for(Volume, volume_id=result['id'])}
def delete(volume_id): """Deletes the volume pointed by the id. Args: volume_id (str): The id parsed from the URL Returns: tuple: payload, http status code, headers """ manager = app.volume_manager target_volume = manager.by_id(volume_id) if target_volume is None: return {'message': 'Not Found'}, 404 if target_volume.value['state'] != 'ready': return {'message': 'Resource not in ready state, can\'t delete.'}, 409 lock = manager.get_lock(volume_id, 'clone') lock.acquire(timeout=0, lock_ttl=10) pending_clones = [] for volume in manager.all()[1]: if volume.value['control']['parent_id'] == volume_id: pending_clones.append(manager.get_id_from_key(volume.key)) if pending_clones: lock.release() return {'message': 'Resource has pending clones, can\'t delete.', 'clones': pending_clones}, 409 target_volume.value['state'] = 'deleting' target_volume = manager.update(target_volume) lock.release() if not target_volume: return {'message': 'Resource changed during transition.'}, 409 result, _ = VolumeSchema().dump(target_volume) return result, 202, {'Location': app.api.url_for(Volume, volume_id=result['id'])}
def __init__(self, schema_dir, resource_schema_class=None, default_base_handler=None, raise_response_validation_errors=False): if resource_schema_class is None: resource_schema_class = FlaskResourceSchema if default_base_handler is None: default_base_handler = flask_restful.Resource super(FlaskRouter, self).__init__( schema_dir, resource_schema_class, default_base_handler, raise_response_validation_errors)
def resource_method_info(self, method) -> tuple: """Return the (class) name of the resource and its type, which is associated with the provided method (function). Used by this class' decorators to determine the name of the instance which subclassed this class, e.g., `WorksCollection`, as well as passing a nice `enum` for determining if it's a collection or singleton resource. Arguments: method (function): Hopefully a method which belongs to an instance that inherited from this class. Raises: InvalidResourceName: If the function belongs to a resource class whose name neither ends in `Collection` nor `Singleton`. Returns: tuple[str, str]: Resource class name, resoure type. """ resource_class_name = self.__class__.__name__ if resource_class_name.endswith('Collection'): resource_type = ResourceType.collection elif resource_class_name.endswith('Singleton'): resource_type = ResourceType.singleton else: raise InvalidResourceName(resource_class_name) return resource_class_name, resource_type
def add_year_restriction(self, function): """Resource method decorator which applies a restriction only allowing results related to a syllabus of one year of age or older. This only works for HTTP GET methods on collection resources. Other methods will be unaffected entirely by this decorator. """ method = function.__name__ __, resource_type = self.resource_method_info(function) if method == "get" and resource_type == ResourceType.collection: @wraps(function) def new_function_to_replace_old(*args, **kwargs): request_args = { 'useOldIndex': True } request_args.update(flask.request.args) flask.request.args = ImmutableMultiDict(request_args) return function(*args, **kwargs) return new_function_to_replace_old else: return function # TODO, FIXME: bad, remove this ASAP # (this is here as filler because we haven't gotten # to the elasticsearch bit yet)
def test_custom_role_limits(self): def before_create_app(): """Create a resource which uses overrides for role limits. """ class TestResource(Resource): decorators = [ limits.limiter.limit( limits.RoleBasedLimits(free='1 per day'), ), ] def get(self): return {'lol': 'lol!'} api.add_resource(TestResource, '/test') client = setup_client(before_create_app) response = client.get('/v1/test') data = json.loads(response.get_data(as_text=True)) assert 'lol' in data response = client.get('/v1/test') assert response.status_code == 429
def get(self, slug=None): if slug: obj = self.resource.model.query.filter(self.resource.model.id == slug) obj = self.resource.has_read_permission(obj).first() if obj: return make_response(jsonify(self.resource.schema(exclude=tuple(self.resource.obj_exclude), only=tuple(self.resource.obj_only)).dump( obj, many=False).data), 200) return make_response(jsonify({'error': True, 'message': 'Resource not found'}), 404) else: objects = self.resource.apply_filters(queryset=self.resource.model.query, **request.args) objects = self.resource.has_read_permission(objects) if '__order_by' in request.args: objects = self.resource.apply_ordering(objects, request.args['__order_by']) if '__export__' in request.args and self.resource.export is True: objects = objects.paginate(page=self.resource.page, per_page=self.resource.max_export_limit) return make_response_from_records( self.resource.schema(exclude=tuple(self.resource.obj_exclude), only=tuple(self.resource.obj_only)) .dump(objects.items, many=True).data, 'csv', 200, self.resource.model.__name__) resources = objects.paginate(page=self.resource.page, per_page=self.resource.limit) if resources.items: return make_response(jsonify({'success': True, 'data': self.resource.schema(exclude=tuple(self.resource.obj_exclude), only=tuple(self.resource.obj_only)) .dump(resources.items, many=True).data, 'total': resources.total}), 200) return make_response(jsonify({'error': True, 'message': 'No Resource Found'}), 404)
def patch(self, slug): obj = self.resource.model.query.get(slug) if not obj: return make_response(jsonify({'error': True, 'message': 'Resource not found'}), 404) try: data, status = self.resource.patch_resource(obj) except (SQLIntegrityError, SQlOperationalError) as e: db.session.rollback() e.message['error'] = True return make_response(jsonify(e.message), e.status) return make_response(jsonify(data), status)
def delete(self, slug): obj = self.resource.model.query.get(slug) if obj: if self.resource.has_delete_permission(obj): db.session.delete(obj) db.session.commit() return make_response(jsonify({}), 204) else: return make_response( jsonify({'error': True, 'message': 'Forbidden Permission Denied To Delete Resource'}), 403) return make_response(jsonify({'error': True, 'message': 'Resource not found'}), 404)
def get(self, shot_id): """ Resource to retrieve the casting of a given shot. """ shot = shots_service.get_shot(shot_id) if not permissions.has_manager_permissions(): user_service.check_has_task_related(shot["project_id"]) return breakdown_service.get_casting(shot_id)
def put(self, shot_id): """ Resource to allow the modification of assets linked to a shot. """ casting = request.json permissions.check_manager_permissions() return breakdown_service.update_casting(shot_id, casting)
def delete(self, pk): obj = self.query_one(pk) self.abort(obj) db.session.delete(obj) db.session.commit() return {'message': 'Resource was deleted successfully'}
def abort(self, obj, status_code=404, message='Resource was not found'): if obj is None: flask_abort(status_code, message)
def __init__(self, *args, **kwargs): BaseResource.__init__(self, FlaskRequestHelper(), *args, **kwargs) flask_restful.Resource.__init__(self) self._request_helper.parse(flask_restful.request, self.REQUEST_ARGS) # Need to specify these again because for some reason Flask MethodView doesn't pick up # the class methods inherited from BaseResource
def delete(self,ident): return 'PUT is not a valid command', 202 # Resource Zone Collection API
def delete(self,ident): return 'PUT is not a valid command', 202 # Resource Block Collection API
def get(self): fmlogging.debug("Received GET request for all resources.") resp_data = {} env_name = request.args.get('env_name') all_resources = '' if env_name: env_obj = env_db.Environment().get_by_name(env_name) if env_obj: all_resources = res_db.Resource().get_resources_for_env(env_obj.id) resp_data['data'] = [res_db.Resource.to_json(res) for res in all_resources] response = jsonify(**resp_data) response.status_code = 200 return response else: message = ("Environment with name {env_name} does not exist").format(env_name=env_name) fmlogging.debug(message) resp_data = {'error': message} response = jsonify(**resp_data) response.status_code = 404 return response else: all_resources = res_db.Resource().get_all() resp_data['data'] = [res_db.Resource.to_json(res) for res in all_resources] response = jsonify(**resp_data) response.status_code = 200 return response
def get(self, resource_id): fmlogging.debug("Received GET request for resource %s" % resource_id) resp_data = {} response = jsonify(**resp_data) resource = res_db.Resource().get(resource_id) if resource: resp_data['data'] = res_db.Resource.to_json(resource) response = jsonify(**resp_data) response.status_code = 200 else: response.status_code = 404 return response
def add_mocks_to_method(self, resource_method): """Resource method decorator which supplies a mock response if a client specifies `mock=True`. Please see `osp_api.mocks`. A naming convention is used to automatically find and use the appropriate mock as a response. Warning: Mocks bypass any authentication (and possibly other decorators, too!). """ resource_class_name, resource_type = self.resource_method_info( resource_method, ) if resource_type == ResourceType.collection: index_before_suffix = -10 mock_suffix = '_collection' elif resource_type == ResourceType.singleton: index_before_suffix = -9 mock_suffix = '_singleton' # Get the "resource name," e.g., "TopWorksCollection" becomes # "top-works_collection." resource_name = '' for i, letter in enumerate(resource_class_name[:index_before_suffix]): if i == 0: resource_name += letter.lower() elif letter.isupper(): resource_name += '-' + letter.lower() else: resource_name += letter http_method = resource_method.__name__ mock_name = resource_name + mock_suffix + '_' + http_method @wraps(resource_method) def wrapped_resource_method(*args, **kwargs): """Replace the existing method with this one.""" if bool(flask.request.args.get('mock')): mock_response = utils.load_json_mock(mock_name) return mock_response else: return resource_method(*args, **kwargs) return wrapped_resource_method # FIXME: eventually this will be redone to tell ElasticSearch to use # an index consisting of records a year of age or older. The age is # measured by class year (when it was taught in a syllabus) and NOT # simply when it was added to our database.
def DeleteComposedSystem(ident): rb = g.rest_base resource_ids={'Processors':[],'Memory':[],'SimpleStorage':[],'EthernetInterfaces':[]} # Verify if the System exists and if is of type - "SystemType": "Composed" if ident in members: if members[ident]['SystemType'] == 'Composed': # Remove Links to Composed System and change CompositionState (to 'Unused') in associated Resource Blocks for block in members[ident]['Links']['ResourceBlocks']: block = block['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','') resource_blocks[block]['Links']['ComputerSystems'] for index, item in enumerate(resource_blocks[block]['Links']['ComputerSystems']): if resource_blocks[block]['Links']['ComputerSystems'][index]['@odata.id'].replace(rb + 'Systems/','') == ident: del resource_blocks[block]['Links']['ComputerSystems'][index] resource_blocks[block]['CompositionStatus']['CompositionState'] = 'Unused' for device_type in resource_ids.keys(): for device in resource_blocks[block][device_type]: resource_ids[device_type].append(device) # Remove links to Processors, Memory, SimpleStorage, etc for device_type in resource_ids.keys(): for device in resource_ids[device_type]: if device_type == 'Processors': device_back = device['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','') del processors[ident][device_back] if processors[ident]=={}: del processors[ident] elif device_type == 'Memory': device_back = device['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','') del memory[ident][device_back] if memory[ident]=={}: del memory[ident] elif device_type == 'SimpleStorage': device_back = device['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','') del simplestorage[ident][device_back] if simplestorage[ident]=={}: del simplestorage[ident] elif device_type == 'EthernetInterfaces': device_back = device['@odata.id'].replace(rb + 'CompositionService/ResourceBlocks/','') del ethernetinterfaces[ident][device_back] if ethernetinterfaces[ident]=={}: del ethernetinterfaces[ident] # Remove Composed System from System list del members[ident] resp = 200 else: # It is not a Composed System and therefore cannot be deleted as such" return INTERNAL_ERROR # return resp