我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask_restful.reqparse.RequestParser()。
def get(self): parser = reqparse.RequestParser() parser.add_argument("code", type=str, required=True, location="json") code = parser.parse_args()["code"] response = json.loads(requests.post("https://github.com/login/oauth/access_token", json={"code": code, "client_id": GITHUB_CLIENT_ID, "client_secret": GITHUB_CLIENT_SECRET}).text) accessToken = response["access_token"] githubUser = json.loads(requests.get("https://api.github.com/user", data={"access_token": accessToken}).text) dbUser = db.user.find_one({"_id": githubUser['id']}) if dbUser is None: newUser = {"_id": githubUser["id"], "name": githubUser['username'], "joinDate": datetime.datetime.today().strftime('%Y-%m-%d')} db.tempUser.insert_one(newUser) return redirect(WEBSITE_DOMAIN+"/signup.html#"+newUser["_id"]) else: session['userID'] = dbUser["_id"] return jsonify({ "loggedIn": True, "user": user })
def post(self): parser = reqparse.RequestParser() parser.add_argument("schoolID", type=str, required=True, location="json") parser.add_argument("userID", type=str, required=True, location="json") args = parser.parse_args() user = db.tempUser.find_one({"_id": args["userID"]}) if user is None: abort(400) school = db.school.find_one({"_id": args["schoolID"]}) if school is None: abort(400) user["schoolID"] = args["schoolID"] db.user.insert_one(user) session['userID'] = user["_id"] return jsonify(user, status=201)
def get_request_params(params): """ parse args from request, and return a dict :param params: the params to parse :type params: list :return: dict eg: params = [ ("name", str, True, ["headers", "cookies"]), ("password", str, True, ["args", "form"]) ] """ parser = reqparse.RequestParser() for p in params: parser.add_argument(p[0], type=p[1], required=p[2], location=p[3]) return parser.parse_args()
def __init__(self, select_list_options, data_list_options, radio_buttons_options, **kwargs): self.parser = reqparse.RequestParser() self.parser.add_argument('text_field') self.parser.add_argument('number_field', type=int) self.parser.add_argument( 'date_field', type=lambda x: dt.datetime.strptime(x, "%Y-%m-%d").date() ) self.parser.add_argument('select_list') self.parser.add_argument('data_list') self.parser.add_argument('checkbox', default=False, type=lambda x: True if x == 'on' else False) self.parser.add_argument('radio_button') self.parser.add_argument('slider', type=int) self.parser.add_argument('comment_field') self.parser.add_argument('email') self.parser.add_argument('password', type=pbkdf2_sha256.hash) self.user_inputs = {k: v for k, v in self.parser.parse_args().items() if v is not None} self.select_list = select_list_options self.data_list = data_list_options self.radio_buttons = radio_buttons_options super().__init__()
def __init__(self, model, features, confusion_matrix, features_imp, **kwargs): self.parser = reqparse.RequestParser() self.parser.add_argument('petal_length_cm', type=float) self.parser.add_argument('petal_width_cm', type=float) self.parser.add_argument('sepal_length_cm', type=float) self.parser.add_argument('sepal_width_cm', type=float) self.parser.add_argument('return_html', type=bool, default=False) args = self.parser.parse_args() self.return_html = args['return_html'] self.user_inputs = {k: v for k, v in args.items() if v is not None and k is not 'return_html'} self.model = model self.features = features self.confusion_matrix = confusion_matrix self.features_imp = features_imp super().__init__()
def post(self): ''' process threat-intel and conditionally alert user as threat notification ''' try: parser = reqparse.RequestParser() parser.add_argument('score', type=int, location='json') parser.add_argument('threat_data', type=json_encode, location='json') args = parser.parse_args() # return jsonify( # score = args['subject'], # threat_data = json_decode(args['threat_data']) # ) return jsonify( status = 200, message = 'Email delivery success' ) except Exception as e: return {'status' : str(e)}
def post(self): parser = reqparse.RequestParser() parser.add_argument('text', location='form', required=True) args = parser.parse_args() emotion = comment_emotions.emotions(args['text'], g) sentic_values = emotion.get_all_sentic_values() compound_emotions = emotion.get_compound_emotion() sentic_values = [e.name for e in sentic_values if e is not None] compound_emotions = [(e.name, v.name) for e, v in compound_emotions] return { 'sentic_values': sentic_values, 'compound_emotions': compound_emotions }
def get(self, zip_code=None): parser = reqparse.RequestParser() parser.add_argument('limit', type=int) args = parser.parse_args(strict=True) limit = args.get('limit') if limit is not None: zip_codes = Zipcode.limit(limit) zip_codes_len = len(zip_codes) if zip_codes_len < limit: app.info_logger.info("Received option to list {limit}" " zipcodes but only {quantity} were found".format(limit=limit, quantity=zip_codes_len)) app.info_logger.info("Listing {quantity} zip_codes".format(quantity=zip_codes_len)) return [zip_code.to_dict() for zip_code in zip_codes], 200 elif zip_code is not None: zip_code_document = Zipcode.get_or_404(zip_code=zip_code, message="zip code {zip_code} not found.".format(zip_code=zip_code)) app.info_logger.info("Get zip_code: {zip_code}".format(zip_code=zip_code)) return zip_code_document.to_dict(), 200 app.error_logger.error("No limit or zip_code option were found in the request") abort(400, message="You must provide the limit or zip_code.")
def __init__(self): self.__app = Flask(__name__) self.__api = Api(self.__app) self.__parser = reqparse.RequestParser() self.__stub_to_peer_service = None # SSL ?? ??? ?? context ?? ??? ????. if conf.ENABLE_REST_SSL == 0: self.__ssl_context = None elif conf.ENABLE_REST_SSL == 1: self.__ssl_context = (conf.DEFAULT_SSL_CERT_PATH, conf.DEFAULT_SSL_KEY_PATH) else: self.__ssl_context = ssl.SSLContext(_ssl.PROTOCOL_SSLv23) self.__ssl_context.verify_mode = ssl.CERT_REQUIRED self.__ssl_context.check_hostname = False self.__ssl_context.load_verify_locations(cafile=conf.DEFAULT_SSL_TRUST_CERT_PATH) self.__ssl_context.load_cert_chain(conf.DEFAULT_SSL_CERT_PATH, conf.DEFAULT_SSL_KEY_PATH)
def post(self): args = self.reqparse.parse_args() if parse_handler==None: return {"result":"fail", "reason":"Please initialize the model first!"}, 400 #parse = parser.SyntaxnetParser(segmenter_model,parser_model,folder=args['syntax_folder']) try: return parse_handler.parse_multi_string(args['strings'],tree=args['tree']) except Exception as e: return {'result': 'fail', "reason": str(e)}, 400 # class SyntaxModelQuery(Resource): # def __init__(self): # self.reqparse = reqparse.RequestParser() # self.reqparse.add_argument('strings', required=True, type=list, help='string is required field, you should input a list of strings', location='json') # self.reqparse.add_argument('syntax_folder', required=False, type=str, default=config.syntaxnetFolder, # location='json') # super(SyntaxModelQuery, self).__init__() # # def post(self,folder): # args = self.reqparse.parse_args() # parse = parser.SyntaxnetParser(folder=args['syntax_folder']) # try: # return parse.parse_multi_string_custom(args['strings'],folder=folder) # except Exception, e: # return {'result': 'fail', "reason": e}, 400
def get(self): parser = reqparse.RequestParser() parser.add_argument('url', required=True, help='url cannot be blank!') parser.add_argument('exclude_semiotic_layer', action='append') parser.add_argument('domain') parser.add_argument('already_converted', type=inputs.boolean, default=False) parser.add_argument('debug', type=inputs.boolean, default=False) # remove this when in production args = parser.parse_args() semiotic_quality_flags = {'syntactic', 'semantic', 'pragmatic', 'social'} if args.exclude_semiotic_layer: exclude_semiotic_layer = set(args.exclude_semiotic_layer) else: exclude_semiotic_layer = set() if exclude_semiotic_layer & semiotic_quality_flags != exclude_semiotic_layer: raise ValueError( 'Invalid semiotic layer. Must be one of: {}.'.format(', '.join(semiotic_quality_flags))) return owl_quality(args.url, semiotic_quality_flags - exclude_semiotic_layer, args.domain, already_converted=args.already_converted, debug=args.debug)
def get(self, task_id): ''' Endpoint for getting information about current state of task ''' order_queries = { 'latest': Answer.query .filter_by(task_id=task_id) .order_by(Answer.created_at.desc()), 'shortest': Answer.query.filter_by(task_id=task_id) .order_by(Answer.length) } parser = reqparse.RequestParser() parser.add_argument('order', location='args', default='latest') args = parser.parse_args() task = manager.get_task(task_id) if not task: return make_response("Tried to query an invalid task.", 400) answers = [answer.to_dict() for answer in order_queries[args.order].all()] return jsonify({ 'task': task, 'answers': answers })
def post(self): '''Endpoint for getting auth token for existing user''' parser = reqparse.RequestParser() parser.add_argument('email', required=True) parser.add_argument('password', required=True) args = parser.parse_args() hash_obj = hashlib.sha256() hash_obj.update(args.password.encode('utf-8')) password_hash = hash_obj.hexdigest() user = db.session.query(User).filter((User.email==args.email) & (User.password_hash==password_hash)).first() if user is None: return redirect('/?message=%s' % 'Could not find account.') # allocate and maintain session token token = os.urandom(256) tokens[token] = args.email session['token'] = token session['username'] = user.username return redirect('/')
def post(self): parser = reqparse.RequestParser() parser.add_argument('node_select', type=int) parser.add_argument('wind_directtion', type=int) parser.add_argument('wind_speed', type=int) parser.add_argument('working_model', type=int) parser.add_argument('temp_setting', type=int) parser.add_argument('switch', type=int) args = parser.parse_args() print(args) mqtt_pub_air_con(args) return args
def post(self): parser = reqparse.RequestParser() parser.add_argument('tianshuoSwitch', type=str) parser.add_argument('tianshuoNo', type=int) args = parser.parse_args() print(args) tianshuoNo = args['tianshuoNo'] tianshuoSwitch = args['tianshuoSwitch'] if tianshuoSwitch == '1': output_hex = calc_modus_hex_str_to_send(tianshuoNo, 6, 0, 0, 0, 9) rs485_socket_send(output_hex) print("{0} switch on!".format(tianshuoNo)) elif tianshuoSwitch == '0': output_hex = calc_modus_hex_str_to_send(tianshuoNo, 6, 0, 0, 0, 1) rs485_socket_send(output_hex) print("{0} switch off!".format(tianshuoNo))
def get(self): parser = reqparse.RequestParser() parser.add_argument('airconSwitch', type=str) parser.add_argument('barnNo', type=str) args = parser.parse_args() print(args) barnNo = args['barnNo'] airconSwitch = args['airconSwitch'] nodes = db.session.query(LoraNode.node_addr).join(GrainBarn, GrainBarn.id == LoraNode.grain_barn_id).filter( GrainBarn.barn_no == barnNo).order_by(LoraNode.node_addr.asc()).all() print("nodes are:", nodes) power_controls_dic = {'data': 'power_controls'} return nodes
def get(self): parser = reqparse.RequestParser() parser.add_argument('airconSwitch', type=str) parser.add_argument('barnNo', type=str) args = parser.parse_args() print(args) barnNo = args['barnNo'] airconSwitch = args['airconSwitch'] nodes = db.session.query(LoraNode.node_addr).join(GrainBarn, GrainBarn.id == LoraNode.grain_barn_id).filter( GrainBarn.barn_no == barnNo).order_by(LoraNode.node_addr.asc()).all() print("nodes are:", nodes) if airconSwitch == '1': pass elif airconSwitch == '0': pass else: print('airconSwitch is :', airconSwitch) return nodes
def post(self): parser = reqparse.RequestParser() parser.add_argument('user_email', type=str) parser.add_argument('subject', type=str) parser.add_argument('user_name', type=str) parser.add_argument('alarm_msg', type=str) args = parser.parse_args() print('\n' * 5) print('**email**' *5) print(args) print('**email**' *5) print('\n' * 5) user_email = args['user_email'] subject = args['subject'] user_name = args['user_name'] alarm_msg = args['alarm_msg'] send_email(user_email, subject, 'mail/email_alarm', user_name=user_name, alarm_msg=alarm_msg) return 'alarm email has sended successfuly!'
def get(self): """API endpoint to retrieve a list of links to transaction outputs. Returns: A :obj:`list` of :cls:`str` of links to outputs. """ parser = reqparse.RequestParser() parser.add_argument('public_key', type=parameters.valid_ed25519, required=True) parser.add_argument('spent', type=parameters.valid_bool) args = parser.parse_args(strict=True) pool = current_app.config['bigchain_pool'] with pool() as bigchain: outputs = bigchain.get_outputs_filtered(args['public_key'], args['spent']) return [{'transaction_id': output.txid, 'output_index': output.output} for output in outputs]
def get(self): """API endpoint to get the related blocks for a transaction. Return: A ``list`` of ``block_id``s that contain the given transaction. The list may be filtered when provided a status query parameter: "valid", "invalid", "undecided". """ parser = reqparse.RequestParser() parser.add_argument('transaction_id', type=str, required=True) parser.add_argument('status', type=str, case_sensitive=False, choices=[Bigchain.BLOCK_VALID, Bigchain.BLOCK_INVALID, Bigchain.BLOCK_UNDECIDED]) args = parser.parse_args(strict=True) tx_id = args['transaction_id'] status = args['status'] pool = current_app.config['bigchain_pool'] with pool() as bigchain: block_statuses = bigchain.get_blocks_status_containing_tx(tx_id) blocks = [block_id for block_id, block_status in block_statuses.items() if not status or block_status == status] return blocks
def get_arguments(self): parser = reqparse.RequestParser() parser.add_argument( "old_password", required=True, help="Old password is missing." ) parser.add_argument( "password", required=True, help="New password is missing." ) parser.add_argument( "password_2", required=True, help="New password confirmation is missing." ) args = parser.parse_args() return ( args["old_password"], args["password"], args["password_2"] )
def get_arguments(self): parser = reqparse.RequestParser() geometry_type = files_service.get_or_create_output_type("geometry") maxsoft = files_service.get_or_create_software("3ds Max", "max", ".max") parser.add_argument("mode", default="working") parser.add_argument("sep", default="/") parser.add_argument("software_id", default=maxsoft["id"]) parser.add_argument("output_type_id", default=geometry_type["id"]) parser.add_argument("name", default="name") args = parser.parse_args() return ( args["mode"], args["software_id"], args["output_type_id"], args["name"], args["sep"], )
def get_arguments(self): geometry_type = files_service.get_or_create_output_type("geometry") maxsoft = files_service.get_or_create_software("3ds Max", "max", ".max") parser = reqparse.RequestParser() parser.add_argument("mode", default="working") parser.add_argument("comment", default="") parser.add_argument("version", default=0) parser.add_argument("software_id", default=maxsoft["id"]) parser.add_argument("output_type_id", default=geometry_type["id"]) parser.add_argument("name", default="") parser.add_argument("sep", default="/") args = parser.parse_args() return ( args["mode"], args["version"], args["comment"], args["software_id"], args["output_type_id"], args["name"], args["sep"] )
def get_arguments(self): parser = reqparse.RequestParser() output_type = files_service.get_or_create_output_type("Geometry") parser.add_argument("output_type_id", default=output_type["id"]) parser.add_argument("person_id", default="") parser.add_argument("comment", default="") parser.add_argument("revision", default=0, type=int) parser.add_argument("separator", default="/") parser.add_argument("extension", default="") args = parser.parse_args() return ( args["comment"], args["person_id"], args["output_type_id"], args["revision"], args["separator"], args["extension"] )
def get_arguments(self): parser = reqparse.RequestParser() parser.add_argument( "email", help="The email is required.", required=True ) parser.add_argument( "first_name", help="The first name is required.", required=True ) parser.add_argument( "last_name", help="The last name is required.", required=True ) parser.add_argument("phone", default="") args = parser.parse_args() return args
def __init__(self, **kwargs): self.restapi = kwargs['restapi'] self.server = kwargs['server'] self.fields = { 'port': fields.Integer, 'name': fields.String, 'type': fields.String, 'created': fields.DateTime(dt_format='iso8601'), 'updated': fields.DateTime(dt_format='iso8601') } self.parser = reqparse.RequestParser(bundle_errors=True) self.parser.add_argument('appeui', type=int) self.parser.add_argument('port', type=int) self.parser.add_argument('name', type=str) self.parser.add_argument('type', type=str) self.args = self.parser.parse_args()
def __init__(self, **kwargs): self.restapi = kwargs['restapi'] self.server = kwargs['server'] self.fields = { 'appif': fields.Integer, 'name': fields.String, } self.parser = reqparse.RequestParser(bundle_errors=True) self.parser.add_argument('type', type=str) self.parser.add_argument('name', type=str) self.parser.add_argument('protocol', type=str) self.parser.add_argument('iothost', type=str) self.parser.add_argument('keyname', type=str) self.parser.add_argument('keyvalue', type=str) self.parser.add_argument('pollinterval', type=int) self.args = self.parser.parse_args()
def test_get(self): """Test get method""" app = self._test_application() mockDBObject.return_value = app with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestApplication(restapi=self.restapi, server=self.server) # Fail to find a device: raises 404 NotFound with patch.object(Application, 'find', classmethod(mockDBObject.findFail)): yield self.assertFailure(resource.get(app.appeui), e.NotFound) # Find a device success returns a dict of field values with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)): resource.getProperties = MagicMock() result = yield resource.get(app.appeui) self.assertEqual(app.appeui, result['appeui'])
def test_put(self): """Test put method""" app = self._test_application() mockDBObject.return_value = app with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestApplication(restapi=self.restapi, server=self.server) # Fail to find a device: raises 404 NotFound with patch.object(Application, 'find', classmethod(mockDBObject.findFail)): yield self.assertFailure(resource.put(app.appeui), e.NotFound) # Find a device, device fails validity check: raises 400 BadRequest with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)): app.valid = MagicMock(return_value=(False, {})) yield self.assertFailure(resource.put(app.appeui), e.BadRequest) # Pass validity check, returns 200 expected = ({}, 200) app.valid = MagicMock(return_value=(True, {})) app.update = MagicMock() result = yield resource.put(app.appeui) self.assertEqual(expected, result)
def test_delete(self): """Test delete method""" app = self._test_application() mockDBObject.return_value = app with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestApplication(restapi=self.restapi, server=self.server) # Device exists with AppEUI: raises 400 error with patch.object(Device, 'find', classmethod(mockDBObject.findSuccess)): yield self.assertFailure(resource.delete(app.appeui), e.BadRequest) # Fail to find the application: raises 404 NotFound with patch.object(Device, 'find', classmethod(mockDBObject.findFail)), \ patch.object(Application, 'find', classmethod(mockDBObject.findFail)): yield self.assertFailure(resource.delete(app.appeui), e.NotFound) # Find and delete, returns 200 with patch.object(Device, 'find', classmethod(mockDBObject.findFail)), \ patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)): expected = ({}, 200) app.delete = MagicMock() result = yield resource.delete(app.appeui) self.assertEqual(expected, result)
def test_get(self): """Test get method""" app = self._test_application() mockDBObject.return_value = [app, app] with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestApplications(restapi=self.restapi, server=self.server) # Fail to find any devices: return empty dict with patch.object(Application, 'all', classmethod(mockDBObject.findFail)): expected = {} result = yield resource.get() self.assertEqual(expected, result) with patch.object(Application, 'all', classmethod(mockDBObject.all)): resource.getProperties = MagicMock() expected = (app.appeui, 2) r = yield resource.get() result = (r[0]['appeui'], len(r)) self.assertEqual(expected, result)
def test_put(self): """Test put method""" device = self._test_device() mockDBObject.return_value = device with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestDevice(restapi=self.restapi, server=self.server) # Fail to find a device: raises 404 NotFound with patch.object(Device, 'find', classmethod(mockDBObject.findFail)): yield self.assertFailure(resource.put(device.deveui), e.NotFound) # Find a device, device fails validity check: raises 400 BadRequest with patch.object(Device, 'find', classmethod(mockDBObject.findSuccess)): device.valid = MagicMock(return_value=(False, {})) yield self.assertFailure(resource.put(device.deveui), e.BadRequest) # Pass validity check, returns 200 expected = ({}, 200) device.valid = MagicMock(return_value=(True, {})) device.update = MagicMock() result = yield resource.put(device.deveui) self.assertEqual(expected, result)
def test_delete(self): """Test delete method""" device = self._test_device() mockDBObject.return_value = device with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestDevice(restapi=self.restapi, server=self.server) # Fail to find a device: raises 404 NotFound with patch.object(Device, 'find', classmethod(mockDBObject.findFail)): yield self.assertFailure(resource.delete(device.deveui), e.NotFound) # Find a device adn delete, returns 200 with patch.object(Device, 'find', classmethod(mockDBObject.findSuccess)): expected = ({}, 200) device.delete = MagicMock() result = yield resource.delete(device.deveui) self.assertEqual(expected, result)
def test_get(self): """Test get method""" device = self._test_device() mockDBObject.return_value = [device, device] with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestDevices(restapi=self.restapi, server=self.server) # Fail to find any devices: return empty dict with patch.object(Device, 'all', classmethod(mockDBObject.findFail)): expected = {} result = yield resource.get() self.assertEqual(expected, result) with patch.object(Device, 'all', classmethod(mockDBObject.all)): expected = (device.deveui, 2) r = yield resource.get() result = (r[0]['deveui'], len(r)) self.assertEqual(expected, result)
def test_delete(self): """Test delete method""" app = self._test_application() prop = self._test_appproperty() args = {'port': 11} with patch.object(reqparse.RequestParser, 'parse_args', MagicMock(return_value=args)): resource = RestAppProperty(restapi=self.restapi, server=self.server) mockDBObject.return_value = prop # Fail to find the application: raises 404 NotFound with patch.object(Application, 'find', classmethod(mockDBObject.findFail)): yield self.assertFailure(resource.delete(app.appeui), e.NotFound) # Find and delete, returns 200 with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)), \ patch.object(AppProperty, 'find', classmethod(mockDBObject.findSuccess)): expected = ({}, 200) prop.delete = MagicMock() result = yield resource.delete(app.appeui) self.assertEqual(expected, result)
def test_get(self): """Test get method""" app = self._test_application() prop = self._test_appproperty() with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestAppPropertys(restapi=self.restapi, server=self.server) # Fail to any properties mockDBObject.return_value = app with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)), \ patch.object(HasMany, 'get', MagicMock(return_value=[])): expected = {} result = yield resource.get(app.appeui) self.assertEqual(expected, result) with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)), \ patch.object(HasMany, 'get', MagicMock(return_value=[prop, prop])): expected = (prop.port, 2) r = yield resource.get(app.appeui) result = (r[0]['port'], len(r)) self.assertEqual(expected, result)
def test_get(self): """Test get method""" gateway = self._test_gateway() mockDBObject.return_value = gateway with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestGateway(restapi=self.restapi, server=self.server) # Fail to find a gateway: raises 404 NotFound with patch.object(Gateway, 'find', classmethod(mockDBObject.findFail)): yield self.assertFailure(resource.get(gateway.host), e.NotFound) # Find a device success returns a dict of field values with patch.object(Gateway, 'find', classmethod(mockDBObject.findSuccess)): result = yield resource.get(gateway.host) self.assertEqual(gateway.host, result['host'])
def test_put(self): """Test put method""" gateway = self._test_gateway() mockDBObject.return_value = gateway with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestGateway(restapi=self.restapi, server=self.server) # Fail to find the gateway: raises 404 NotFound with patch.object(Gateway, 'find', classmethod(mockDBObject.findFail)): yield self.assertFailure(resource.put(gateway.host), e.NotFound) # Find a device, device fails validity check: raises 400 BadRequest with patch.object(Gateway, 'find', classmethod(mockDBObject.findSuccess)): gateway.valid = MagicMock(return_value=(False, {})) yield self.assertFailure(resource.put(gateway.host), e.BadRequest) # Pass validity check, returns 200 expected = ({}, 200) gateway.valid = MagicMock(return_value=(True, {})) gateway.update = MagicMock() result = yield resource.put(gateway.host) self.assertEqual(expected, result)
def test_get(self): """Test get method""" gateway = self._test_gateway() mockDBObject.return_value = [gateway, gateway] with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestGateways(restapi=self.restapi, server=self.server) # Fail to find any devices: return empty dict with patch.object(Gateway, 'all', classmethod(mockDBObject.findFail)): expected = {} result = yield resource.get() self.assertEqual(expected, result) with patch.object(Gateway, 'all', classmethod(mockDBObject.all)): expected = (gateway.host, 2) r = yield resource.get() result = (r[0]['host'], len(r)) self.assertEqual(expected, result)
def test_get(self): """Test get method""" interface = self._test_azureiothttps() appif = self._test_appinterface() interface.appinterface = appif interfaceManager.interfaces = [interface] with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestAppInterface(restapi=self.restapi, server=self.server) # Fail to find the app interface: raises 404 NotFound interfaceManager.getInterface = MagicMock(return_value=None) yield self.assertFailure(resource.get(1), e.NotFound) # Success finding the interface returns a dict of field values interfaceManager.getInterface = MagicMock(return_value=interface) result = yield resource.get(appif.id) self.assertEqual(interface.name, result['name'])
def test_delete(self): """Test delete method""" interface = self._test_azureiothttps() appif = self._test_appinterface() interface.appinterface = appif interfaceManager.interfaces = [interface] with patch.object(reqparse.RequestParser, 'parse_args'): resource = RestAppInterface(restapi=self.restapi, server=self.server) # Fail to find the interface: raises 404 NotFound interfaceManager.getInterface = MagicMock(return_value=None) yield self.assertFailure(resource.delete(1), e.NotFound) # Find and delete, returns 200 with patch.object(AzureIotHttps, 'exists', MagicMock(return_value=True)), \ patch.object(HasMany, 'get', MagicMock(return_value=[appif])), \ patch.object(AzureIotHttps, 'delete'), \ patch.object(AppInterface, 'delete'): interfaceManager.getInterface = MagicMock(return_value=interface) expected = ({}, 200) result = yield resource.delete(1) self.assertEqual(expected, result)
def post(self): parser = reqparse.RequestParser() parser.add_argument('name') parser.add_argument('password') args = parser.parse_args() schemes = [s for s in Scheme.all()] cells = Cell.all('wlan0') newscheme = None for cell in cells: if cell.ssid == args['name']: newscheme = Scheme.for_cell('wlan0', 'scheme-'+str(len(schemes)), cell, args['password']) break if newscheme is None: return jsonify({'response': "network non found"}) else: newscheme.save() newscheme.activate() return jsonify({'response': "ok"})
def patch(self, org_id, location_id, user_id): organization = Organization.query.get_or_404(org_id) user = User.query.get_or_404(user_id) if not user.is_location_manager(location_id): return {"message": "user does not exist or is not a manager"}, 404 parser = reqparse.RequestParser() parser.add_argument("activateReminder", type=inputs.boolean) changes = parser.parse_args(strict=True) # Filter out null values changes = dict((k, v) for k, v in changes.iteritems() if v is not None) if len(changes) == 0: return {}, 204 if "activateReminder" in changes: if user.active: return {"message": "This user is already active"}, 400 user.send_activation_reminder(user, organization.name) return {}, 204
def patch(self, org_id, user_id): organization = Organization.query.get_or_404(org_id) user = User.query.get_or_404(user_id) if not user.is_org_admin(org_id): return {"message": "user does not exist or is not an admin"}, 404 parser = reqparse.RequestParser() parser.add_argument("activateReminder", type=inputs.boolean) changes = parser.parse_args(strict=True) # Filter out null values changes = dict((k, v) for k, v in changes.iteritems() if v is not None) if len(changes) == 0: return {}, 204 if "activateReminder" in changes: if user.active: return {"message": "This user is already active"}, 400 user.send_activation_reminder(user, organization.name) return {}, 204
def get(self): parser = reqparse.RequestParser() parser.add_argument("offset", type=int, default=0) parser.add_argument("limit", type=int, default=25) args = parser.parse_args() offset = args["offset"] limit = args["limit"] response = { "offset": offset, "limit": limit, API_ENVELOPE: [], } organizations = Organization.query response[API_ENVELOPE] = map( lambda organization: marshal(organization, organization_fields), organizations.limit(limit).offset(offset).all()) return response
def get(self, id=None): parser = reqparse.RequestParser() parser.add_argument('delete', type=int) parser.add_argument('offset', type=int) parser.add_argument('limit', type=int) args = parser.parse_args() if id is None: if args['offset'] is not None and args['limit'] is not None: return self.list(args['offset'], args['limit']) else: return self.list() else: if args['delete'] == 1: return self.delete(id) else: return self.read(id)
def post(self, id=None): parser = reqparse.RequestParser() if id is None: parser.add_argument('name', required=True, location='json') parser.add_argument('path', required=True, location='json') parser.add_argument('platform_id', required=True, location='json') parser.add_argument('arch_id', required=True, location='json') parser.add_argument('options', location='json') return self.create(parser.parse_args()) else: parser.add_argument('name', location='json') parser.add_argument('path', location='json') parser.add_argument('platform_id', location='json') parser.add_argument('arch_id', location='json') parser.add_argument('options', location='json') return self.update(id, parser.parse_args())
def get(self, id=None): parser = reqparse.RequestParser() parser.add_argument('delete', type=int) parser.add_argument('offset', type=int) parser.add_argument('limit', type=int) parser.add_argument('host', type=int) args = parser.parse_args() if id is None: if args['offset'] is not None and args['limit'] is not None: return self.list(args['offset'], args['limit']) else: return self.list() else: if args['delete'] == 1: return self.delete(id) elif args['host'] == 1: return self.read(id, 'host') else: return self.read(id)