Python flask_restful.reqparse 模块,RequestParser() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask_restful.reqparse.RequestParser()

项目:NYCSL2    作者:HMProgrammingClub    | 项目源码 | 文件源码
def get(self):
        parser = reqparse.RequestParser()
        parser.add_argument("code", type=str, required=True, location="json")
        code = parser.parse_args()["code"]

        response = json.loads(requests.post("https://github.com/login/oauth/access_token", json={"code": code, "client_id": GITHUB_CLIENT_ID, "client_secret": GITHUB_CLIENT_SECRET}).text)

        accessToken = response["access_token"]
        githubUser = json.loads(requests.get("https://api.github.com/user", data={"access_token": accessToken}).text)

        dbUser = db.user.find_one({"_id": githubUser['id']})
        if dbUser is None:
            newUser = {"_id": githubUser["id"], "name": githubUser['username'], "joinDate": datetime.datetime.today().strftime('%Y-%m-%d')}
            db.tempUser.insert_one(newUser)

            return redirect(WEBSITE_DOMAIN+"/signup.html#"+newUser["_id"])
        else:
            session['userID'] = dbUser["_id"]
            return jsonify({ "loggedIn": True, "user": user })
项目:NYCSL2    作者:HMProgrammingClub    | 项目源码 | 文件源码
def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument("schoolID", type=str, required=True, location="json")
        parser.add_argument("userID", type=str, required=True, location="json")

        args = parser.parse_args()

        user = db.tempUser.find_one({"_id": args["userID"]})
        if user is None: abort(400)

        school = db.school.find_one({"_id": args["schoolID"]})
        if school is None: abort(400)

        user["schoolID"] = args["schoolID"]

        db.user.insert_one(user)
        session['userID'] = user["_id"]

        return jsonify(user, status=201)
项目:1ibrary-gzhu    作者:1ibrary    | 项目源码 | 文件源码
def get_request_params(params):
    """
    parse args from request, and return a dict
    :param params: the params to parse
    :type params: list

    :return: dict

    eg:
    params = [
               ("name", str, True, ["headers", "cookies"]),
               ("password", str, True, ["args", "form"])
             ]
    """
    parser = reqparse.RequestParser()
    for p in params:
        parser.add_argument(p[0], type=p[1], required=p[2], location=p[3])

    return parser.parse_args()
项目:app-skeleton    作者:rragundez    | 项目源码 | 文件源码
def __init__(self, select_list_options, data_list_options,
                 radio_buttons_options, **kwargs):
        self.parser = reqparse.RequestParser()
        self.parser.add_argument('text_field')
        self.parser.add_argument('number_field', type=int)
        self.parser.add_argument(
            'date_field',
            type=lambda x: dt.datetime.strptime(x, "%Y-%m-%d").date()
        )
        self.parser.add_argument('select_list')
        self.parser.add_argument('data_list')
        self.parser.add_argument('checkbox', default=False,
                                 type=lambda x: True if x == 'on' else False)
        self.parser.add_argument('radio_button')
        self.parser.add_argument('slider', type=int)
        self.parser.add_argument('comment_field')
        self.parser.add_argument('email')
        self.parser.add_argument('password', type=pbkdf2_sha256.hash)
        self.user_inputs = {k: v
                            for k, v in self.parser.parse_args().items()
                            if v is not None}
        self.select_list = select_list_options
        self.data_list = data_list_options
        self.radio_buttons = radio_buttons_options
        super().__init__()
项目:app-skeleton    作者:rragundez    | 项目源码 | 文件源码
def __init__(self, model, features, confusion_matrix, features_imp, **kwargs):
        self.parser = reqparse.RequestParser()
        self.parser.add_argument('petal_length_cm', type=float)
        self.parser.add_argument('petal_width_cm', type=float)
        self.parser.add_argument('sepal_length_cm', type=float)
        self.parser.add_argument('sepal_width_cm', type=float)
        self.parser.add_argument('return_html', type=bool, default=False)
        args = self.parser.parse_args()
        self.return_html = args['return_html']
        self.user_inputs = {k: v
                            for k, v in args.items()
                            if v is not None and k is not 'return_html'}
        self.model = model
        self.features = features
        self.confusion_matrix = confusion_matrix
        self.features_imp = features_imp
        super().__init__()
项目:threatdetectionservice    作者:flyballlabs    | 项目源码 | 文件源码
def post(self):
        ''' process threat-intel and conditionally alert user as threat notification '''
        try:
            parser = reqparse.RequestParser()
            parser.add_argument('score', type=int, location='json')
            parser.add_argument('threat_data', type=json_encode, location='json')
            args = parser.parse_args()
#             return jsonify(
#                     score = args['subject'],
#                     threat_data = json_decode(args['threat_data'])
#                 )
            return jsonify(
                    status = 200,
                    message = 'Email delivery success'
                )
        except Exception as e:
            return {'status' : str(e)}
项目:hoot    作者:CatalystOfNostalgia    | 项目源码 | 文件源码
def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument('text', location='form', required=True)
        args = parser.parse_args()

        emotion = comment_emotions.emotions(args['text'], g)
        sentic_values = emotion.get_all_sentic_values()
        compound_emotions = emotion.get_compound_emotion()

        sentic_values = [e.name for e in sentic_values if e is not None]
        compound_emotions = [(e.name, v.name) for e, v in compound_emotions]

        return {
            'sentic_values': sentic_values,
            'compound_emotions': compound_emotions
        }
项目:cep-web-service    作者:IuryAlves    | 项目源码 | 文件源码
def get(self, zip_code=None):
        parser = reqparse.RequestParser()
        parser.add_argument('limit', type=int)
        args = parser.parse_args(strict=True)

        limit = args.get('limit')
        if limit is not None:
            zip_codes = Zipcode.limit(limit)
            zip_codes_len = len(zip_codes)
            if zip_codes_len < limit:
                app.info_logger.info("Received option to list {limit}"
                                     " zipcodes but only {quantity} were found".format(limit=limit,
                                                                                       quantity=zip_codes_len))
            app.info_logger.info("Listing {quantity} zip_codes".format(quantity=zip_codes_len))
            return [zip_code.to_dict() for zip_code in zip_codes], 200
        elif zip_code is not None:
            zip_code_document = Zipcode.get_or_404(zip_code=zip_code,
                                                   message="zip code {zip_code} not found.".format(zip_code=zip_code))
            app.info_logger.info("Get zip_code: {zip_code}".format(zip_code=zip_code))
            return zip_code_document.to_dict(), 200
        app.error_logger.error("No limit or zip_code option were found in the request")
        abort(400, message="You must provide the limit or zip_code.")
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def __init__(self):
        self.__app = Flask(__name__)
        self.__api = Api(self.__app)
        self.__parser = reqparse.RequestParser()
        self.__stub_to_peer_service = None

        # SSL ?? ??? ?? context ?? ??? ????.
        if conf.ENABLE_REST_SSL == 0:
            self.__ssl_context = None
        elif conf.ENABLE_REST_SSL == 1:
            self.__ssl_context = (conf.DEFAULT_SSL_CERT_PATH, conf.DEFAULT_SSL_KEY_PATH)
        else:
            self.__ssl_context = ssl.SSLContext(_ssl.PROTOCOL_SSLv23)

            self.__ssl_context.verify_mode = ssl.CERT_REQUIRED
            self.__ssl_context.check_hostname = False

            self.__ssl_context.load_verify_locations(cafile=conf.DEFAULT_SSL_TRUST_CERT_PATH)
            self.__ssl_context.load_cert_chain(conf.DEFAULT_SSL_CERT_PATH, conf.DEFAULT_SSL_KEY_PATH)
项目:syntaxnet-rest-api    作者:ljm625    | 项目源码 | 文件源码
def post(self):
        args = self.reqparse.parse_args()
        if parse_handler==None:
            return {"result":"fail", "reason":"Please initialize the model first!"}, 400
        #parse = parser.SyntaxnetParser(segmenter_model,parser_model,folder=args['syntax_folder'])
        try:
            return parse_handler.parse_multi_string(args['strings'],tree=args['tree'])
        except Exception as e:
            return {'result': 'fail', "reason": str(e)}, 400

# class SyntaxModelQuery(Resource):
#     def __init__(self):
#         self.reqparse = reqparse.RequestParser()
#         self.reqparse.add_argument('strings', required=True, type=list, help='string is required field, you should input a list of strings', location='json')
#         self.reqparse.add_argument('syntax_folder', required=False, type=str, default=config.syntaxnetFolder,
#                                    location='json')
#         super(SyntaxModelQuery, self).__init__()
#
#     def post(self,folder):
#         args = self.reqparse.parse_args()
#         parse = parser.SyntaxnetParser(folder=args['syntax_folder'])
#         try:
#             return parse.parse_multi_string_custom(args['strings'],folder=folder)
#         except Exception, e:
#             return {'result': 'fail', "reason": e}, 400
项目:owlparser    作者:MelindaMcDaniel    | 项目源码 | 文件源码
def get(self):
        parser = reqparse.RequestParser()
        parser.add_argument('url', required=True, help='url cannot be blank!')
        parser.add_argument('exclude_semiotic_layer', action='append')
        parser.add_argument('domain')
        parser.add_argument('already_converted', type=inputs.boolean, default=False)
        parser.add_argument('debug', type=inputs.boolean, default=False)  # remove this when in production
        args = parser.parse_args()
        semiotic_quality_flags = {'syntactic', 'semantic', 'pragmatic', 'social'}
        if args.exclude_semiotic_layer:
            exclude_semiotic_layer = set(args.exclude_semiotic_layer)
        else:
            exclude_semiotic_layer = set()
        if exclude_semiotic_layer & semiotic_quality_flags != exclude_semiotic_layer:
            raise ValueError(
                'Invalid semiotic layer. Must be one of: {}.'.format(', '.join(semiotic_quality_flags)))
        return owl_quality(args.url, semiotic_quality_flags - exclude_semiotic_layer,
                           args.domain, already_converted=args.already_converted, debug=args.debug)
项目:rp2017-codegolf    作者:ReflectionsProjections    | 项目源码 | 文件源码
def get(self, task_id):
        ''' Endpoint for getting information about current state of task '''
        order_queries = {
            'latest': Answer.query
                            .filter_by(task_id=task_id)
                            .order_by(Answer.created_at.desc()),
            'shortest': Answer.query.filter_by(task_id=task_id)
                                    .order_by(Answer.length)
        }
        parser = reqparse.RequestParser()
        parser.add_argument('order', location='args', default='latest')
        args = parser.parse_args()
        task = manager.get_task(task_id)
        if not task:
            return make_response("Tried to query an invalid task.", 400)
        answers = [answer.to_dict()
                   for answer in order_queries[args.order].all()]
        return jsonify({
            'task': task,
            'answers': answers
        })
项目:rp2017-codegolf    作者:ReflectionsProjections    | 项目源码 | 文件源码
def post(self):
        '''Endpoint for getting auth token for existing user'''
        parser = reqparse.RequestParser()
        parser.add_argument('email', required=True)
        parser.add_argument('password', required=True)
        args = parser.parse_args()
        hash_obj = hashlib.sha256()
        hash_obj.update(args.password.encode('utf-8'))
        password_hash = hash_obj.hexdigest()
        user = db.session.query(User).filter((User.email==args.email) & (User.password_hash==password_hash)).first()
        if user is None:
            return redirect('/?message=%s' % 'Could not find account.')

        # allocate and maintain session token
        token = os.urandom(256)
        tokens[token] = args.email
        session['token'] = token
        session['username'] = user.username
        return redirect('/')
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument('node_select', type=int)
        parser.add_argument('wind_directtion', type=int)
        parser.add_argument('wind_speed', type=int)
        parser.add_argument('working_model', type=int)
        parser.add_argument('temp_setting', type=int)
        parser.add_argument('switch', type=int)

        args = parser.parse_args()

        print(args)

        mqtt_pub_air_con(args)

        return args
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument('tianshuoSwitch', type=str)
        parser.add_argument('tianshuoNo', type=int)

        args = parser.parse_args()
        print(args)

        tianshuoNo = args['tianshuoNo']
        tianshuoSwitch = args['tianshuoSwitch']

        if tianshuoSwitch == '1':
            output_hex = calc_modus_hex_str_to_send(tianshuoNo, 6, 0, 0, 0, 9)
            rs485_socket_send(output_hex)
            print("{0} switch on!".format(tianshuoNo))

        elif tianshuoSwitch == '0':
            output_hex = calc_modus_hex_str_to_send(tianshuoNo, 6, 0, 0, 0, 1)
            rs485_socket_send(output_hex)
            print("{0} switch off!".format(tianshuoNo))
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def get(self):
        parser = reqparse.RequestParser()

        parser.add_argument('airconSwitch', type=str)
        parser.add_argument('barnNo', type=str)

        args = parser.parse_args()
        print(args)

        barnNo = args['barnNo']
        airconSwitch = args['airconSwitch']

        nodes = db.session.query(LoraNode.node_addr).join(GrainBarn, GrainBarn.id == LoraNode.grain_barn_id).filter(
            GrainBarn.barn_no == barnNo).order_by(LoraNode.node_addr.asc()).all()
        print("nodes are:", nodes)
        power_controls_dic = {'data': 'power_controls'}

        return nodes
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def get(self):
        parser = reqparse.RequestParser()

        parser.add_argument('airconSwitch', type=str)
        parser.add_argument('barnNo', type=str)

        args = parser.parse_args()
        print(args)

        barnNo = args['barnNo']
        airconSwitch = args['airconSwitch']

        nodes = db.session.query(LoraNode.node_addr).join(GrainBarn, GrainBarn.id == LoraNode.grain_barn_id).filter(
            GrainBarn.barn_no == barnNo).order_by(LoraNode.node_addr.asc()).all()
        print("nodes are:", nodes)
        if airconSwitch == '1':
            pass
        elif airconSwitch == '0':
            pass
        else:
            print('airconSwitch is :', airconSwitch)
        return nodes
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def post(self):

        parser = reqparse.RequestParser()
        parser.add_argument('user_email', type=str)
        parser.add_argument('subject', type=str)
        parser.add_argument('user_name', type=str)
        parser.add_argument('alarm_msg', type=str)

        args = parser.parse_args()
        print('\n' * 5)
        print('**email**' *5)
        print(args)
        print('**email**' *5)

        print('\n' * 5)
        user_email = args['user_email']
        subject = args['subject']
        user_name = args['user_name']
        alarm_msg = args['alarm_msg']


        send_email(user_email, subject, 'mail/email_alarm', user_name=user_name, alarm_msg=alarm_msg)

        return 'alarm email has sended successfuly!'
项目:bigchaindb    作者:bigchaindb    | 项目源码 | 文件源码
def get(self):
        """API endpoint to retrieve a list of links to transaction
        outputs.

            Returns:
                A :obj:`list` of :cls:`str` of links to outputs.
        """
        parser = reqparse.RequestParser()
        parser.add_argument('public_key', type=parameters.valid_ed25519,
                            required=True)
        parser.add_argument('spent', type=parameters.valid_bool)
        args = parser.parse_args(strict=True)

        pool = current_app.config['bigchain_pool']
        with pool() as bigchain:
            outputs = bigchain.get_outputs_filtered(args['public_key'],
                                                    args['spent'])
            return [{'transaction_id': output.txid, 'output_index': output.output}
                    for output in outputs]
项目:bigchaindb    作者:bigchaindb    | 项目源码 | 文件源码
def get(self):
        """API endpoint to get the related blocks for a transaction.

        Return:
            A ``list`` of ``block_id``s that contain the given transaction. The
            list may be filtered when provided a status query parameter:
            "valid", "invalid", "undecided".
        """
        parser = reqparse.RequestParser()
        parser.add_argument('transaction_id', type=str, required=True)
        parser.add_argument('status', type=str, case_sensitive=False,
                            choices=[Bigchain.BLOCK_VALID, Bigchain.BLOCK_INVALID, Bigchain.BLOCK_UNDECIDED])

        args = parser.parse_args(strict=True)
        tx_id = args['transaction_id']
        status = args['status']

        pool = current_app.config['bigchain_pool']

        with pool() as bigchain:
            block_statuses = bigchain.get_blocks_status_containing_tx(tx_id)
            blocks = [block_id for block_id, block_status in block_statuses.items()
                      if not status or block_status == status]

        return blocks
项目:zou    作者:cgwire    | 项目源码 | 文件源码
def get_arguments(self):
        parser = reqparse.RequestParser()
        parser.add_argument(
            "old_password",
            required=True,
            help="Old password is missing."
        )
        parser.add_argument(
            "password",
            required=True,
            help="New password is missing."
        )
        parser.add_argument(
            "password_2",
            required=True,
            help="New password confirmation is missing."
        )
        args = parser.parse_args()

        return (
            args["old_password"],
            args["password"],
            args["password_2"]
        )
项目:zou    作者:cgwire    | 项目源码 | 文件源码
def get_arguments(self):
        parser = reqparse.RequestParser()
        geometry_type = files_service.get_or_create_output_type("geometry")
        maxsoft = files_service.get_or_create_software("3ds Max", "max", ".max")

        parser.add_argument("mode", default="working")
        parser.add_argument("sep", default="/")
        parser.add_argument("software_id", default=maxsoft["id"])
        parser.add_argument("output_type_id", default=geometry_type["id"])
        parser.add_argument("name", default="name")
        args = parser.parse_args()

        return (
            args["mode"],
            args["software_id"],
            args["output_type_id"],
            args["name"],
            args["sep"],
        )
项目:zou    作者:cgwire    | 项目源码 | 文件源码
def get_arguments(self):
        geometry_type = files_service.get_or_create_output_type("geometry")
        maxsoft = files_service.get_or_create_software("3ds Max", "max", ".max")

        parser = reqparse.RequestParser()
        parser.add_argument("mode", default="working")
        parser.add_argument("comment", default="")
        parser.add_argument("version", default=0)
        parser.add_argument("software_id", default=maxsoft["id"])
        parser.add_argument("output_type_id", default=geometry_type["id"])
        parser.add_argument("name", default="")
        parser.add_argument("sep", default="/")
        args = parser.parse_args()

        return (
            args["mode"],
            args["version"],
            args["comment"],
            args["software_id"],
            args["output_type_id"],
            args["name"],
            args["sep"]
        )
项目:zou    作者:cgwire    | 项目源码 | 文件源码
def get_arguments(self):
        parser = reqparse.RequestParser()
        output_type = files_service.get_or_create_output_type("Geometry")
        parser.add_argument("output_type_id", default=output_type["id"])
        parser.add_argument("person_id", default="")
        parser.add_argument("comment", default="")
        parser.add_argument("revision", default=0, type=int)
        parser.add_argument("separator", default="/")
        parser.add_argument("extension", default="")
        args = parser.parse_args()

        return (
            args["comment"],
            args["person_id"],
            args["output_type_id"],
            args["revision"],
            args["separator"],
            args["extension"]
        )
项目:zou    作者:cgwire    | 项目源码 | 文件源码
def get_arguments(self):
        parser = reqparse.RequestParser()
        parser.add_argument(
            "email",
            help="The email is required.",
            required=True
        )
        parser.add_argument(
            "first_name",
            help="The first name is required.",
            required=True
        )
        parser.add_argument(
            "last_name",
            help="The last name is required.",
            required=True
        )
        parser.add_argument("phone", default="")
        args = parser.parse_args()
        return args
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        self.restapi = kwargs['restapi']
        self.server = kwargs['server']
        self.fields = {
            'port': fields.Integer,
            'name': fields.String,
            'type': fields.String,
            'created': fields.DateTime(dt_format='iso8601'),
            'updated': fields.DateTime(dt_format='iso8601')
        }
        self.parser = reqparse.RequestParser(bundle_errors=True)
        self.parser.add_argument('appeui', type=int)
        self.parser.add_argument('port', type=int)
        self.parser.add_argument('name', type=str)
        self.parser.add_argument('type', type=str)
        self.args = self.parser.parse_args()
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        self.restapi = kwargs['restapi']
        self.server = kwargs['server']
        self.fields = {
            'appif': fields.Integer,
            'name': fields.String,
        }
        self.parser = reqparse.RequestParser(bundle_errors=True)
        self.parser.add_argument('type', type=str)
        self.parser.add_argument('name', type=str)
        self.parser.add_argument('protocol', type=str)
        self.parser.add_argument('iothost', type=str)
        self.parser.add_argument('keyname', type=str)
        self.parser.add_argument('keyvalue', type=str)
        self.parser.add_argument('pollinterval', type=int)
        self.args = self.parser.parse_args()
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_get(self):
        """Test get method"""

        app = self._test_application()
        mockDBObject.return_value = app

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestApplication(restapi=self.restapi, server=self.server)

            # Fail to find a device: raises 404 NotFound
            with patch.object(Application, 'find', classmethod(mockDBObject.findFail)):
                yield self.assertFailure(resource.get(app.appeui), e.NotFound)

            # Find a device success returns a dict of field values
            with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)):
                resource.getProperties = MagicMock()
                result = yield resource.get(app.appeui)
                self.assertEqual(app.appeui, result['appeui'])
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_put(self):
        """Test put method"""

        app = self._test_application()
        mockDBObject.return_value = app

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestApplication(restapi=self.restapi, server=self.server)

            # Fail to find a device: raises 404 NotFound
            with patch.object(Application, 'find', classmethod(mockDBObject.findFail)):
                yield self.assertFailure(resource.put(app.appeui), e.NotFound)

            # Find a device, device fails validity check: raises 400 BadRequest
            with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)):
                app.valid = MagicMock(return_value=(False, {}))
                yield self.assertFailure(resource.put(app.appeui), e.BadRequest)

                # Pass validity check, returns 200
                expected = ({}, 200)
                app.valid = MagicMock(return_value=(True, {}))
                app.update = MagicMock()
                result = yield resource.put(app.appeui)
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_delete(self):
        """Test delete method"""

        app = self._test_application()
        mockDBObject.return_value = app

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestApplication(restapi=self.restapi, server=self.server)

            # Device exists with AppEUI: raises 400 error
            with patch.object(Device, 'find', classmethod(mockDBObject.findSuccess)):
                yield self.assertFailure(resource.delete(app.appeui), e.BadRequest)

            # Fail to find the application: raises 404 NotFound
            with patch.object(Device, 'find', classmethod(mockDBObject.findFail)), \
                patch.object(Application, 'find', classmethod(mockDBObject.findFail)):
                yield self.assertFailure(resource.delete(app.appeui), e.NotFound)

            # Find and delete, returns 200
            with patch.object(Device, 'find', classmethod(mockDBObject.findFail)), \
                patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)):
                expected = ({}, 200)
                app.delete = MagicMock()
                result = yield resource.delete(app.appeui)
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_get(self):
        """Test get method"""

        app = self._test_application()
        mockDBObject.return_value = [app, app]

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestApplications(restapi=self.restapi, server=self.server)

            # Fail to find any devices: return empty dict
            with patch.object(Application, 'all', classmethod(mockDBObject.findFail)):
                expected = {}
                result = yield resource.get()
                self.assertEqual(expected, result)

            with patch.object(Application, 'all', classmethod(mockDBObject.all)):
                resource.getProperties = MagicMock()
                expected = (app.appeui, 2)
                r = yield resource.get()
                result = (r[0]['appeui'], len(r))
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_put(self):
        """Test put method"""

        device = self._test_device()
        mockDBObject.return_value = device

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestDevice(restapi=self.restapi, server=self.server)

            # Fail to find a device: raises 404 NotFound
            with patch.object(Device, 'find', classmethod(mockDBObject.findFail)):
                yield self.assertFailure(resource.put(device.deveui), e.NotFound)

            # Find a device, device fails validity check: raises 400 BadRequest
            with patch.object(Device, 'find', classmethod(mockDBObject.findSuccess)):
                device.valid = MagicMock(return_value=(False, {}))
                yield self.assertFailure(resource.put(device.deveui), e.BadRequest)

                # Pass validity check, returns 200
                expected = ({}, 200)
                device.valid = MagicMock(return_value=(True, {}))
                device.update = MagicMock()
                result = yield resource.put(device.deveui)
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_delete(self):
        """Test delete method"""

        device = self._test_device()
        mockDBObject.return_value = device

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestDevice(restapi=self.restapi, server=self.server)

            # Fail to find a device: raises 404 NotFound
            with patch.object(Device, 'find', classmethod(mockDBObject.findFail)):
                yield self.assertFailure(resource.delete(device.deveui), e.NotFound)

            # Find a device adn delete, returns 200
            with patch.object(Device, 'find', classmethod(mockDBObject.findSuccess)):
                expected = ({}, 200)
                device.delete = MagicMock()
                result = yield resource.delete(device.deveui)
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_get(self):
        """Test get method"""

        device = self._test_device()
        mockDBObject.return_value = [device, device]

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestDevices(restapi=self.restapi, server=self.server)

            # Fail to find any devices: return empty dict
            with patch.object(Device, 'all', classmethod(mockDBObject.findFail)):
                expected = {}
                result = yield resource.get()
                self.assertEqual(expected, result)

            with patch.object(Device, 'all', classmethod(mockDBObject.all)):
                expected = (device.deveui, 2)
                r = yield resource.get()
                result = (r[0]['deveui'], len(r))
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_delete(self):
        """Test delete method"""

        app = self._test_application()
        prop = self._test_appproperty()
        args = {'port': 11}

        with patch.object(reqparse.RequestParser, 'parse_args',
                          MagicMock(return_value=args)):
            resource = RestAppProperty(restapi=self.restapi, server=self.server)
            mockDBObject.return_value = prop

            # Fail to find the application: raises 404 NotFound
            with patch.object(Application, 'find', classmethod(mockDBObject.findFail)):
                yield self.assertFailure(resource.delete(app.appeui), e.NotFound)

            # Find and delete, returns 200
            with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)), \
                patch.object(AppProperty, 'find', classmethod(mockDBObject.findSuccess)):
                expected = ({}, 200)
                prop.delete = MagicMock()
                result = yield resource.delete(app.appeui)
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_get(self):
        """Test get method"""

        app = self._test_application()
        prop = self._test_appproperty()

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestAppPropertys(restapi=self.restapi, server=self.server)

            # Fail to any properties
            mockDBObject.return_value = app
            with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)), \
                patch.object(HasMany, 'get', MagicMock(return_value=[])):
                expected = {}
                result = yield resource.get(app.appeui)
                self.assertEqual(expected, result)

            with patch.object(Application, 'find', classmethod(mockDBObject.findSuccess)), \
                patch.object(HasMany, 'get', MagicMock(return_value=[prop, prop])):
                expected = (prop.port, 2)
                r = yield resource.get(app.appeui)
                result = (r[0]['port'], len(r))
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_get(self):
        """Test get method"""

        gateway = self._test_gateway()
        mockDBObject.return_value = gateway

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestGateway(restapi=self.restapi, server=self.server)

            # Fail to find a gateway: raises 404 NotFound
            with patch.object(Gateway, 'find', classmethod(mockDBObject.findFail)):
                yield self.assertFailure(resource.get(gateway.host), e.NotFound)

            # Find a device success returns a dict of field values
            with patch.object(Gateway, 'find', classmethod(mockDBObject.findSuccess)):
                result = yield resource.get(gateway.host)
                self.assertEqual(gateway.host, result['host'])
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_put(self):
        """Test put method"""

        gateway = self._test_gateway()
        mockDBObject.return_value = gateway

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestGateway(restapi=self.restapi, server=self.server)

            # Fail to find the gateway: raises 404 NotFound
            with patch.object(Gateway, 'find', classmethod(mockDBObject.findFail)):
                yield self.assertFailure(resource.put(gateway.host), e.NotFound)

            # Find a device, device fails validity check: raises 400 BadRequest
            with patch.object(Gateway, 'find', classmethod(mockDBObject.findSuccess)):
                gateway.valid = MagicMock(return_value=(False, {}))
                yield self.assertFailure(resource.put(gateway.host), e.BadRequest)

                # Pass validity check, returns 200
                expected = ({}, 200)
                gateway.valid = MagicMock(return_value=(True, {}))
                gateway.update = MagicMock()
                result = yield resource.put(gateway.host)
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_get(self):
        """Test get method"""

        gateway = self._test_gateway()
        mockDBObject.return_value = [gateway, gateway]

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestGateways(restapi=self.restapi, server=self.server)

            # Fail to find any devices: return empty dict
            with patch.object(Gateway, 'all', classmethod(mockDBObject.findFail)):
                expected = {}
                result = yield resource.get()
                self.assertEqual(expected, result)

            with patch.object(Gateway, 'all', classmethod(mockDBObject.all)):
                expected = (gateway.host, 2)
                r = yield resource.get()
                result = (r[0]['host'], len(r))
                self.assertEqual(expected, result)
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_get(self):
        """Test get method"""

        interface = self._test_azureiothttps()
        appif = self._test_appinterface()
        interface.appinterface = appif
        interfaceManager.interfaces = [interface]

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestAppInterface(restapi=self.restapi, server=self.server)

            # Fail to find the app interface: raises 404 NotFound
            interfaceManager.getInterface = MagicMock(return_value=None)
            yield self.assertFailure(resource.get(1), e.NotFound)

            # Success finding the interface returns a dict of field values
            interfaceManager.getInterface = MagicMock(return_value=interface)
            result = yield resource.get(appif.id)
            self.assertEqual(interface.name, result['name'])
项目:floranet    作者:Fluent-networks    | 项目源码 | 文件源码
def test_delete(self):
        """Test delete method"""

        interface = self._test_azureiothttps()
        appif = self._test_appinterface()
        interface.appinterface = appif
        interfaceManager.interfaces = [interface]

        with patch.object(reqparse.RequestParser, 'parse_args'):
            resource = RestAppInterface(restapi=self.restapi, server=self.server)

            # Fail to find the interface: raises 404 NotFound
            interfaceManager.getInterface = MagicMock(return_value=None)
            yield self.assertFailure(resource.delete(1), e.NotFound)

            # Find and delete, returns 200
            with patch.object(AzureIotHttps, 'exists', MagicMock(return_value=True)), \
                patch.object(HasMany, 'get', MagicMock(return_value=[appif])), \
                patch.object(AzureIotHttps, 'delete'), \
                patch.object(AppInterface, 'delete'):
                interfaceManager.getInterface = MagicMock(return_value=interface)
                expected = ({}, 200)
                result = yield resource.delete(1)
                self.assertEqual(expected, result)
项目:hbrain-ci    作者:HotBlackRobotics    | 项目源码 | 文件源码
def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument('name')
        parser.add_argument('password')
        args = parser.parse_args()

        schemes = [s for s in Scheme.all()]
        cells = Cell.all('wlan0')

        newscheme = None
        for cell in cells:
            if cell.ssid == args['name']:
                newscheme = Scheme.for_cell('wlan0', 'scheme-'+str(len(schemes)), cell, args['password'])
                break
        if newscheme is None:
            return jsonify({'response': "network non found"})
        else:
            newscheme.save()
            newscheme.activate()
            return jsonify({'response': "ok"})
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def patch(self, org_id, location_id, user_id):
        organization = Organization.query.get_or_404(org_id)
        user = User.query.get_or_404(user_id)

        if not user.is_location_manager(location_id):
            return {"message": "user does not exist or is not a manager"}, 404

        parser = reqparse.RequestParser()
        parser.add_argument("activateReminder", type=inputs.boolean)
        changes = parser.parse_args(strict=True)

        # Filter out null values
        changes = dict((k, v) for k, v in changes.iteritems() if v is not None)

        if len(changes) == 0:
            return {}, 204

        if "activateReminder" in changes:
            if user.active:
                return {"message": "This user is already active"}, 400

            user.send_activation_reminder(user, organization.name)

        return {}, 204
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def patch(self, org_id, user_id):
        organization = Organization.query.get_or_404(org_id)
        user = User.query.get_or_404(user_id)

        if not user.is_org_admin(org_id):
            return {"message": "user does not exist or is not an admin"}, 404

        parser = reqparse.RequestParser()
        parser.add_argument("activateReminder", type=inputs.boolean)
        changes = parser.parse_args(strict=True)

        # Filter out null values
        changes = dict((k, v) for k, v in changes.iteritems() if v is not None)

        if len(changes) == 0:
            return {}, 204

        if "activateReminder" in changes:
            if user.active:
                return {"message": "This user is already active"}, 400

            user.send_activation_reminder(user, organization.name)

        return {}, 204
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def get(self):
        parser = reqparse.RequestParser()
        parser.add_argument("offset", type=int, default=0)
        parser.add_argument("limit", type=int, default=25)

        args = parser.parse_args()

        offset = args["offset"]
        limit = args["limit"]

        response = {
            "offset": offset,
            "limit": limit,
            API_ENVELOPE: [],
        }

        organizations = Organization.query
        response[API_ENVELOPE] = map(
            lambda organization: marshal(organization, organization_fields),
            organizations.limit(limit).offset(offset).all())

        return response
项目:FuzzFlow    作者:talos-vulndev    | 项目源码 | 文件源码
def get(self, id=None):
        parser = reqparse.RequestParser()
        parser.add_argument('delete', type=int)
        parser.add_argument('offset', type=int)
        parser.add_argument('limit', type=int)
        args = parser.parse_args()
        if id is None:
            if args['offset'] is not None and args['limit'] is not None:
                return self.list(args['offset'], args['limit'])
            else:
                return self.list()
        else:
            if args['delete'] == 1:
                return self.delete(id)
            else:
                return self.read(id)
项目:FuzzFlow    作者:talos-vulndev    | 项目源码 | 文件源码
def post(self, id=None):
        parser = reqparse.RequestParser()
        if id is None:
            parser.add_argument('name', required=True, location='json')
            parser.add_argument('path', required=True, location='json')
            parser.add_argument('platform_id', required=True, location='json')
            parser.add_argument('arch_id', required=True, location='json')
            parser.add_argument('options', location='json')
            return self.create(parser.parse_args())
        else:
            parser.add_argument('name', location='json')
            parser.add_argument('path', location='json')
            parser.add_argument('platform_id', location='json')
            parser.add_argument('arch_id', location='json')
            parser.add_argument('options', location='json')
            return self.update(id, parser.parse_args())
项目:FuzzFlow    作者:talos-vulndev    | 项目源码 | 文件源码
def get(self, id=None):
        parser = reqparse.RequestParser()
        parser.add_argument('delete', type=int)
        parser.add_argument('offset', type=int)
        parser.add_argument('limit', type=int)
        args = parser.parse_args()
        if id is None:
            if args['offset'] is not None and args['limit'] is not None:
                return self.list(args['offset'], args['limit'])
            else:
                return self.list()
        else:
            if args['delete'] == 1:
                return self.delete(id)
            else:
                return self.read(id)
项目:FuzzFlow    作者:talos-vulndev    | 项目源码 | 文件源码
def get(self, id=None):
        parser = reqparse.RequestParser()
        parser.add_argument('delete', type=int)
        parser.add_argument('offset', type=int)
        parser.add_argument('limit', type=int)
        args = parser.parse_args()
        if id is None:
            if args['offset'] is not None and args['limit'] is not None:
                return self.list(args['offset'], args['limit'])
            else:
                return self.list()
        else:
            if args['delete'] == 1:
                return self.delete(id)
            else:
                return self.read(id)
项目:FuzzFlow    作者:talos-vulndev    | 项目源码 | 文件源码
def get(self, id=None):
        parser = reqparse.RequestParser()
        parser.add_argument('delete', type=int)
        parser.add_argument('offset', type=int)
        parser.add_argument('limit', type=int)
        parser.add_argument('host', type=int)

        args = parser.parse_args()
        if id is None:
            if args['offset'] is not None and args['limit'] is not None:
                return self.list(args['offset'], args['limit'])
            else:
                return self.list()
        else:
            if args['delete'] == 1:
                return self.delete(id)
            elif args['host'] == 1:
                return self.read(id, 'host')
            else:
                return self.read(id)