Python flask.request 模块,is_json() 实例源码

我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用flask.request.is_json()

项目:apizen    作者:blackmatrix7    | 项目源码 | 文件源码
def before_request():
    request_param = {key.lower(): value for key, value in request.environ.items()
                     if key in ('CONTENT_TYPE', 'CONTENT_LENGTH', 'HTTP_HOST',
                                'HTTP_ACCEPT', 'HTTP_ACCEPT_ENCODING', 'HTTP_COOKIE',
                                'HTTP_USER_AGENT', 'PATH_INFO', 'QUERY_STRING',
                                'SERVER_PROTOCOL', 'REQUEST_METHOD', 'HTTP_HOST',
                                'SERVER_PORT', 'SERVER_SOFTWARE', 'REMOTE_ADDR',
                                'REMOTE_PORT', 'HTTP_ACCEPT_LANGUAGE')}
    g.request_raw_data = request.get_data().decode('utf8')
    g.request_time = datetime.now()
    g.api_method = request.args['method']
    g.api_version = request.args['v']
    g.request_param = request_param
    g.request_form = request.form.to_dict() if request.form else None
    try:
        g.request_json = request.get_json() if request.is_json else None
    except Exception:
        raise ApiSysExceptions.invalid_json
项目:payments    作者:devops-s17-payments    | 项目源码 | 文件源码
def update_payments(id):
    try:
        if not request.is_json:
            raise DataValidationError('Invalid payment: Content Type is not json')
        data = request.get_json(silent=True)
        message = payment_service.update_payment(id,payment_replacement=data)
        rc = status.HTTP_200_OK
    except PaymentNotFoundError as e:
        message = e.message
        rc = status.HTTP_404_NOT_FOUND
    except DataValidationError as e:
        message = e.message
        rc = status.HTTP_400_BAD_REQUEST
    return make_response(jsonify(message), rc)

######################################################################
# UPDATE AN EXISTING PAYMENT PARTIALLY
######################################################################
项目:payments    作者:devops-s17-payments    | 项目源码 | 文件源码
def update_partial_payments(id):
    try:
        if not request.is_json:
            raise DataValidationError('Invalid payment: Content Type is not json')
        data = request.get_json(silent=True)
        message = payment_service.update_payment(id,payment_attributes=data)
        rc = status.HTTP_200_OK
    except PaymentNotFoundError as e:
        message = e.message
        rc = status.HTTP_404_NOT_FOUND
    except DataValidationError as e:
        message = e.message
        rc = status.HTTP_400_BAD_REQUEST
    return make_response(jsonify(message), rc)


######################################################################
# DELETE A PAYMENT
######################################################################
项目:payments    作者:devops-s17-payments    | 项目源码 | 文件源码
def charge_payment(user_id):
    try:
        if not request.data:
            raise DataValidationError('Invalid request: body of request contained no data')
        if not request.is_json:
            raise DataValidationError('Invalid request: request not json')
        data = request.get_json()
        if data['amount']:
            if(data['amount'] < 0):
                raise DataValidationError('Invalid request: Order amount is negative.')
            else:
                resp = payment_service.perform_payment_action(user_id,payment_attributes=data)
                if resp == True:
                    message = {'success' : 'Default payment method for user_id: %s has been charged $%.2f' % (str(user_id), data['amount'])}
                    rc = status.HTTP_200_OK
    except DataValidationError as e:
        message = {'error' : e.message}
        rc = status.HTTP_400_BAD_REQUEST
    except KeyError as e:
        message = {'error' : 'Invalid request: body of request does not have the amount to be charged'}
        rc = status.HTTP_400_BAD_REQUEST
    return make_response(jsonify(message), rc)
项目:fastapi    作者:zhangnian    | 项目源码 | 文件源码
def get_json_arg(name, parser=None, validator=None):
    jdata = request.get_json(force=True, silent=True)

    if not request.is_json:
        raise APIError(ret=1, msg='????????')

    if jdata is None:
        raise APIError(ret=1, msg='????????')

    val = jdata.get(name, None)
    if val is None:
        raise APIError(ret=1, msg='????:{}'.format(name))

    if parser and callable(parser):
        try:
            val = parser(val)
        except Exception as e:
            raise APIError(ret=1, msg='????:{}??'.format(name))

    if validator and callable(validator):
        if not validator(val):
            raise APIError(ret=1, msg='??:{}???'.format(name))
    return val
项目:fastapi    作者:zhangnian    | 项目源码 | 文件源码
def get_json_arg_default(name, default=None, parser=None, validator=None):
    jdata = request.get_json(force=True, silent=True)

    if not request.is_json:
        raise APIError(ret=1, msg='????????')

    if jdata is None:
        raise APIError(ret=1, msg='????????')

    val = jdata.get(name, None)
    if val is None:
        if default is not None:
            return default
        raise APIError(ret=1, msg='????:{}'.format(name))

    if parser and callable(parser):
        try:
            val = parser(val)
        except Exception as e:
            raise APIError(ret=1, msg='????:{}??'.format(name))

    if validator and callable(validator):
        if not validator(val):
            raise APIError(ret=1, msg='??:{}???'.format(name))
    return val
项目:legacy    作者:kytos    | 项目源码 | 文件源码
def save_topology(name):
        """Save a topology layout in a file.

        This method get a json topology from request and puts this in a file.

        Parameters:
            name (string): name of the topology to  be saved or loaded.

        Returns:
            topology (string): topology using json format.
        """
        if not request.is_json:
            return json.dumps('{"error": "gt was not a JSON request"}'), 400
        topology = request.get_json()
        with open(join(settings.TOPOLOGY_DIR, name + '.json'), 'w') as outfile:
            json.dump(topology, outfile)
        return json.dumps({'response': 'Saved'}), 201
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def login():
    if not request.is_json:
        return jsonify({"msg": "Missing JSON in request"}), 400

    username = request.json.get('username', None)
    password = request.json.get('password', None)
    if not username:
        return jsonify({"msg": "Missing username parameter"}), 400
    if not password:
        return jsonify({"msg": "Missing password parameter"}), 400

    if username != 'test' or password != 'test':
        return jsonify({"msg": "Bad username or password"}), 401

    # Identity can be any data that is json serializable
    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token), 200


# Protect a view with jwt_required, which requires a valid access token
# in the request to access.
项目:cookiecutter-flask-restful    作者:karec    | 项目源码 | 文件源码
def login():
    """Authenticate user and return token
    """
    if not request.is_json:
        return jsonify({"msg": "Missing JSON in request"}), 400

    username = request.json.get('username', None)
    password = request.json.get('password', None)
    if not username or not password:
        return jsonify({"msg": "Missing username or password"}), 400

    user = User.query.filter_by(username=username).first()
    if user is None or not pwd_context.verify(password, user.password):
        return jsonify({"msg": "Bad credentials"}), 400

    access_token = create_access_token(identity=user.id)
    refresh_token = create_refresh_token(identity=user.id)

    ret = {
        'access_token': access_token,
        'refresh_token': refresh_token
    }
    return jsonify(ret), 200
项目:aihackathon    作者:nicoheidtke    | 项目源码 | 文件源码
def process_image():
    try:
        assert request.is_json
        assert 'imageUrl' in request.json

        image_url = request.json['imageUrl']
        trusted_source_url, trusted_source_image = None, None
        trusted_data = check_url(image_url)
        if trusted_data is not None and 'image_url' in trusted_data:
            trusted_source_image = trusted_data.image_url
            trusted_source_url = trusted_data.url

        result = {
            'status': STATUS_OK,
            'trusted_source_url': trusted_source_url,
            'trusted_source_image': trusted_source_image,
            'source_url': image_url
        }
        return jsonify(result)
    except AssertionError:
        return make_response(jsonify({'status': STATUS_ERROR, 'message': 'malformed request'}), 400)
项目:payments    作者:devops-s17-payments    | 项目源码 | 文件源码
def set_default(user_id):
    try:
        if not request.data:
            raise DataValidationError('Invalid request: body of request contained no data')
        if not request.is_json:
            raise DataValidationError('Invalid request: request not json')
        data = request.get_json()
        if data['payment_id']:
            resp = payment_service.perform_payment_action(user_id,payment_attributes=data)
            if resp == True:
                message = { 'success' : 'Payment with id: %s set as default for user with user_id: %s.' % (data['payment_id'], str(user_id)) }
                rc = status.HTTP_200_OK
            else:
                message = { 'error' : 'No Payment with id: %s was found for user with user_id: %s.' % (data['payment_id'], str(user_id)) }
                rc = status.HTTP_404_NOT_FOUND
    except DataValidationError as e:
        message = {'error' : e.message}
        rc = status.HTTP_400_BAD_REQUEST
    except KeyError as e:
        message = {'error' : 'Invalid request: body of request does not have the payment_id'}
        rc = status.HTTP_400_BAD_REQUEST

    return make_response(jsonify(message), rc)

######################################################################
# RETRIEVE A PAYMENT
######################################################################
项目:flask-breathalyzer    作者:mindflayer    | 项目源码 | 文件源码
def get_http_info(self):
        """
        Determine how to retrieve actual data, basically if it's JSON or not.
        """
        try:
            is_json = request.is_json
        except AttributeError:
            is_json = request.get_json(silent=True) is not None
        if is_json:
            retriever = Breathalyzer.get_json_data
        else:
            retriever = Breathalyzer.get_form_data
        return self.get_http_info_with_retriever(retriever)
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def is_json(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not request.is_json:
            return jsonify(error=True,
                           status=400,
                           message='Content-Type should be application/json')
        return f(*args, **kwargs)

    return decorated_function
项目:sample-platform    作者:CCExtractor    | 项目源码 | 文件源码
def request_from_github(abort_code=418):
    def decorator(f):
        """
        Decorator that checks if a request is a GitHub hook request
        """
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if request.method != 'POST':
                return 'OK'
            else:
                # Do initial validations on required headers
                if 'X-Github-Event' not in request.headers:
                    abort(abort_code)
                if 'X-Github-Delivery' not in request.headers:
                    abort(abort_code)
                if 'X-Hub-Signature' not in request.headers:
                    abort(abort_code)
                if not request.is_json:
                    abort(abort_code)
                if 'User-Agent' not in request.headers:
                    abort(abort_code)
                ua = request.headers.get('User-Agent')
                if not ua.startswith('GitHub-Hookshot/'):
                    abort(abort_code)

                request_ip = ip_address(u'{0}'.format(request.remote_addr))
                meta_json = requests.get('https://api.github.com/meta').json()
                hook_blocks = meta_json['hooks']

                # Check if the POST request is from GitHub
                for block in hook_blocks:
                    if ip_address(request_ip) in ip_network(block):
                        break
                else:
                    g.log.info("Unauthorized attempt to deploy by IP %s" %
                               request_ip)
                    abort(abort_code)
                return f(*args, **kwargs)
        return decorated_function
    return decorator
项目:indi-lite-tools    作者:GuLinux    | 项目源码 | 文件源码
def with_json_request(func):
    @wraps(func)
    def dec(*args, **kwargs):
        if not request.is_json:
            return 'Bad json request', 400
        return func(*args, **kwargs)
    return dec
项目:dbs-back    作者:Beit-Hatfutsot    | 项目源码 | 文件源码
def get_items(slugs):
    if slugs:
        items_list = slugs.split(',')
    elif request.is_json:
        items_list = request.get_json()

    # Check if there are items from ugc collection and test their access control
    ugc_items = []
    for item in items_list:
        if item.startswith('ugc'):
            ugc_items.append(item)
    user_oid = current_user.is_authenticated and current_user.id

    items = fetch_items(items_list)
    if len(items) == 1 and 'error_code' in items[0]:
        error = items[0]
        abort(error['error_code'],  error['msg'])
    else:
        # Cast items to list
        if type(items) != list:
            items = [items]
        # Check that each of the ugc_items is accessible by the logged in user
        for ugc_item_id in [i[4:] for i in ugc_items]:
            for item in items:
                if item['_id'] == ugc_item_id and item.has_key('owner') and item['owner'] != unicode(user_oid):
                    abort(403, 'You are not authorized to access item ugc.{}'.format(str(item['_id'])))
        return humanify(items)
项目:lab5    作者:zlotus    | 项目源码 | 文件源码
def test_instance_attachment_collection_service(test_id):
    resp = flask.Response(json.dumps({'status': 'failed'}))
    if request.method == 'POST':
        if request.is_xhr:
            file = request.files['attachments']
            if file and allowed_file(file.filename):
                filename = secure_filename(file.filename)
                dirpath = os.path.join(UPLOAD_FOLDER, str(test_id), 'attachments')
                os.makedirs(dirpath, exist_ok=True)
                filepath = os.path.join(dirpath, filename)
                file.save(filepath)

                # db: update the attachment for the test where test.id = test_id
                test = Test.query.get(test_id)
                test.test_attachment.append(TestAttachment(name=filename, attachment_url=filepath))
                db.session.commit()

                resp = flask.Response(json.dumps({'status': 'success', 'url': filepath}))
    elif request.method == 'DELETE':
        if request.is_json:
            filename = secure_filename(request.json['removedFile'])
            dirpath = os.path.join(UPLOAD_FOLDER, str(test_id), 'attachments')
            filepath = os.path.join(dirpath, filename)
            try:
                os.remove(filepath)
                # db: delete the attachment for the test where test.id = test_id
                TestAttachment.query.filter(
                    (TestAttachment.test_id == test_id) &
                    (TestAttachment.name == filename)
                ).delete()
                db.session.commit()

                resp = flask.Response(json.dumps({'status': 'success', 'url': filepath}))
            except FileNotFoundError:
                print('FileNotFound: ', filepath)
                resp = flask.Response(json.dumps({'status': 'failed', 'url': filepath}))

    return set_debug_response_header(resp)
项目:lab5    作者:zlotus    | 项目源码 | 文件源码
def formulation_instance_service(f_id):
    resp = flask.Response(json.dumps({'status': 'failed'}))
    if request.method == 'PUT':
        if request.is_json:
            p_json_list = request.json['properties']
            formulation = Formulation.query.get(f_id)
            formulation.formulation_property.delete()
            # fp_rs = Formulation.query.get(f_id).formulation_property
            # FormulationProperty.query.filter(FormulationProperty.formulation_id == f_id).delete()
            for p in p_json_list:
                formulation.formulation_property.append(FormulationProperty(
                    key=p['keyName'],
                    value=p['valueName']
                ))
            db.session.commit()
            p_list = []
            for p in formulation.formulation_property:
                p_list.append({p.key: p.value})
            resp = flask.Response(json.dumps({'status': 'success',
                                              'formulation_id': formulation.id,
                                              'formulation_properties': p_list}))
    elif request.method == 'DELETE':
        formulation = Formulation.query.get(f_id)
        test_count = formulation.test.count()
        if test_count > 0:
            resp = flask.Response(json.dumps({'status': 'failed',
                                              'error': 'the tests count of formulation id %d is not 0, '
                                                       'delete tests first'}))
        else:
            Formulation.query.filter(Formulation.id == f_id).delete()
            db.session.commit()
    return set_debug_response_header(resp)
项目:aihackathon    作者:nicoheidtke    | 项目源码 | 文件源码
def process_text():
    try:
        assert request.is_json

        tweet = request.json['text'] if 'text' in request.json else None
        url = request.json['pageUrl'] if 'pageUrl' in request.json else None

        share, total_engaged, resource_trust = None, None, None
        if url:
            comments, reaction, share, total_engaged = check_virality(url)
            resource_trust = check_info_source(url)

        analysis_result = None
        if tweet:
            tweet = tweet if len(tweet) < 1000 else tweet[:1000]
            analysis_result = compare_tweet_with_storage(tweet)

        result = {
            'status': STATUS_OK,
            'data': {
                'credibility': analysis_result,
                'engaged': total_engaged,
                'shares': share,
                'site_credibility': resource_trust
            },
            'source_text': tweet
        }
        return jsonify(result)
    except AssertionError:
        return make_response(jsonify({'status': STATUS_ERROR, 'message': 'malformed request'}), 400)
    # except:
    #     return make_response(jsonify({'status': STATUS_ERROR, 'message': 'oops...'}), 500)
项目:rpc-gating    作者:rcbops    | 项目源码 | 文件源码
def json_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not request.is_json:
            abort(400, "JSON Required")
        return f(*args, **kwargs)
    return decorated_function
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def post(self, src_vnf, src_intfs, dst_vnf, dst_intfs):
        """
         A post request to "/v1/chain/<src_vnf>/<src_intfs>/<dst_vnf>/<dst_intfs>"
         will create a chain between two interfaces at the specified vnfs.
         The POST data contains the path like this.
         { "path": ["dc1.s1", "s1", "dc4.s1"]}
         path specifies the destination vnf and interface and contains a list of switches
         that the path traverses. The path may not contain single hop loops like:
         [s1, s2, s1].
         This is a limitation of Ryu, as Ryu does not allow the `INPUT_PORT` action!

        :param src_vnf: Name of the source VNF
        :type src_vnf: ``str``
        :param src_intfs: Name of the source VNF interface to chain on
        :type src_intfs: ``str``
        :param dst_vnf: Name of the destination VNF
        :type dst_vnf: ``str``
        :param dst_intfs: Name of the destination VNF interface to chain on
        :type dst_intfs: ``str``
        :return: flask.Response 200 if set up correctly else 500 also returns the cookie as dict {'cookie': value}
         501 if one of the VNF / intfs does not exist
        :rtype: :class:`flask.Response`

        """

        if request.is_json:
            path = request.json.get('path')
            layer2 = request.json.get('layer2', True)
        else:
            path = None
            layer2 = True

        # check if both VNFs exist
        if not self.api.manage.check_vnf_intf_pair(src_vnf, src_intfs):
            return Response(u"VNF %s or intfs %s does not exist" % (src_vnf, src_intfs), status=501,
                            mimetype="application/json")
        if not self.api.manage.check_vnf_intf_pair(dst_vnf, dst_intfs):
            return Response(u"VNF %s or intfs %s does not exist" % (dst_vnf, dst_intfs), status=501,
                            mimetype="application/json")
        try:
            cookie = self.api.manage.network_action_start(src_vnf, dst_vnf, vnf_src_interface=src_intfs,
                                                          vnf_dst_interface=dst_intfs, bidirectional=True,
                                                          path=path, layer2=layer2)
            resp = {'cookie': cookie}
            return Response(json.dumps(resp), status=200, mimetype="application/json")

        except Exception as e:
            logging.exception(u"%s: Error setting up the chain.\n %s" % (__name__, e))
            return Response(u"Error setting up the chain", status=500, mimetype="application/json")
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def post(self, src_dc, src_stack, src_vnf, src_intfs, dst_dc, dst_stack, dst_vnf, dst_intfs):
        """
         A post request to "/v1/chain/<src_dc>/<src_stack>/<src_vnf>/<src_intfs>/<dst_dc>/<dst_stack>/<dst_vnf>/<dst_intfs>"
         will create a chain between two interfaces at the specified vnfs.
         The POST data contains the path like this.
         { "path": ["dc1.s1", "s1", "dc4.s1"]}
         path specifies the destination vnf and interface and contains a list of switches
         that the path traverses. The path may not contain single hop loops like:
         [s1, s2, s1].
         This is a limitation of Ryu, as Ryu does not allow the `INPUT_PORT` action!

        :param src_vnf: Name of the source VNF
        :type src_vnf: ``str``
        :param src_intfs: Name of the source VNF interface to chain on
        :type src_intfs: ``str``
        :param dst_vnf: Name of the destination VNF
        :type dst_vnf: ``str``
        :param dst_intfs: Name of the destination VNF interface to chain on
        :type dst_intfs: ``str``
        :return: flask.Response 200 if set up correctly else 500 also returns the cookie as dict {'cookie': value}
         501 if vnf / intfs do not exist
        :rtype: :class:`flask.Response`

        """
        if request.is_json:
            path = request.json.get('path')
            layer2 = request.json.get('layer2', True)
        else:
            path = None
            layer2 = True

        # search for real names
        real_names = self._findNames(src_dc, src_stack, src_vnf, src_intfs, dst_dc, dst_stack, dst_vnf, dst_intfs)
        if type(real_names) is not tuple:
            # something went wrong
            return real_names

        container_src, container_dst, interface_src, interface_dst = real_names

        try:
            cookie = self.api.manage.network_action_start(container_src, container_dst, vnf_src_interface=interface_src,
                                                          vnf_dst_interface=interface_dst, bidirectional=True,
                                                          path=path, layer2=layer2)
            resp = {'cookie': cookie}
            return Response(json.dumps(resp), status=200, mimetype="application/json")

        except Exception as e:
            logging.exception(u"%s: Error setting up the chain.\n %s" % (__name__, e))
            return Response(u"Error setting up the chain", status=500, mimetype="application/json")
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def start_trex():
    if request.method == 'POST':
        if request.is_json:
            req_data = request.get_json(cache=False)
            if req_data is not None:
                try:
                    traffic_config = {
                        "pps": int(req_data['input']['pps'].encode("ascii")),
                        "src_n": int(req_data['input']['src_n'].encode("ascii")),
                        "pkts_n": int(req_data['input']['pkts_n'].encode("ascii")),
                        "mac_dest": req_data['input']['mac_dest'].encode("ascii"),
                        "packet_size": int(req_data['input']['packet_size'].encode("ascii")),
                        "mult": int(req_data['input']['mult'].encode("ascii")),
                    }
                    if traffic_config["pps"] > 0 and traffic_config["mac_dest"]:
                        """If # of PPS or MAC addresses is positive"""
                        if not Trex.is_running():
                            try:
                                start_traffic(traffic_config)
                            except:
                                import traceback
                                print "exception", traceback.format_exc()
                                return responsify('error', get_error_message('trex_not_start'))
                            return responsify('ok', 'start')
                        else:
                            return responsify('error', get_error_message('trex_already_running'))
                    else:
                        """Stop TRex if 0 PPS or MAC addresses received"""
                        stop_trex(True)
                        return responsify('error', get_error_message('pps_must_be_positive'))

                except (AttributeError, KeyError):
                    return responsify('error', get_error_message('not_json'))
                except ValueError:
                    return responsify('error', get_error_message('ascii_error'))
            else:
                return responsify('error', get_error_message('not_json'))
        else:
            return responsify('error', get_error_message('not_json'))
    else:
        return responsify("ok", "ok")


# Stop sending traffic
项目:lab5    作者:zlotus    | 项目源码 | 文件源码
def test_collection_service():
    resp = flask.Response(json.dumps({'status': 'failed'}))
    if request.method == 'GET':
        if request.args.get('formulationID'):
            formulation_id = request.args['formulationID']
            test_list = []
            test_rs = Formulation.query.get(formulation_id).test
            for test_r in test_rs:
                test_list.append({
                    'id': test_r.id,
                    'name': test_r.name,
                    'measure_type': test_r.measure_type,
                    'thickness': test_r.thickness,
                    'temperature_max': test_r.temperature_max,
                    'temperature_min': test_r.temperature_min,
                    'frequency_max': test_r.frequency_max,
                    'frequency_min': test_r.frequency_min,
                    'test_type': test_r.test_type,
                    'data_file_url': test_r.data_file_url,
                    'date': test_r.date.timestamp(),
                    'formulation_id': test_r.formulation_id,
                })
            resp = flask.Response(json.dumps({'status': 'success', 'test_list': test_list}))
    elif request.method == 'POST':
        if request.is_json:
            temperature, temperature_min, temperature_max = 0, 0, 0
            frequency, frequency_min, frequency_max = 0, 0, 0
            measure_type = request.json['measureType']
            date_ts = request.json.pop('date', datetime.now().timestamp())
            if measure_type == 'temperature':
                frequency_min = frequency_max = request.json['frequencyMin']
                temperature_min = request.json['temperatureMin']
                temperature_max = request.json['temperatureMax']
            elif measure_type == 'frequency':
                temperature_min = temperature_max = request.json['temperatureMin']
                frequency_min = request.json['frequencyMin']
                frequency_max = request.json['frequencyMax']
            test = Test(name=request.json['name'],
                        measure_type=measure_type,
                        thickness=request.json['thickness'],
                        temperature_max=temperature_max,
                        temperature_min=temperature_min,
                        frequency_max=frequency_max,
                        frequency_min=frequency_min,
                        test_type=request.json['testType'],
                        formulation_id=request.json['selectedFormulationID'],
                        date=datetime.fromtimestamp(date_ts))
            db.session.add(test)
            db.session.commit()
            resp = flask.Response(json.dumps({'status': 'success',
                                              'test_id': test.id,
                                              'test_name': test.name}))

    return set_debug_response_header(resp)
项目:lab5    作者:zlotus    | 项目源码 | 文件源码
def formulation_collection_service():
    resp = flask.Response(json.dumps({'status': 'failed'}))
    if request.method == 'GET':
        formulations = []
        formulations_rs = Formulation.query.all()
        for formulation_r in formulations_rs:
            formulation_properties = []
            formulations_property_rs = formulation_r.formulation_property.all()
            for formulations_property_r in formulations_property_rs:
                formulation_properties.append({formulations_property_r.key: formulations_property_r.value})

            formulations.append({
                'id': formulation_r.id,
                'name': formulation_r.name,
                'date': formulation_r.date.timestamp(),
                'formulation_properties': formulation_properties
            })
        resp = flask.Response(json.dumps({'status': 'success', 'formulations': formulations}))
    elif request.method == 'POST':
        if request.is_json:
            req_json = request.json
            property_list, property_key_list = [], []
            name = req_json.pop('formulationName', 'formulation-%f' % datetime.now().timestamp())
            date_ts = req_json.pop('formulationDate', datetime.now().timestamp())
            property_key_list = [x for x in req_json if x.startswith('key-')]
            property_key_list.sort(key=lambda x: int(x.replace('key-', '')))
            for fpkey in property_key_list:
                key = req_json[fpkey]
                val = req_json['value-%s' % fpkey.split('-', maxsplit=1)[1]]
                property_list.append((key, val))
            # create new formulation in db
            formulation = Formulation(name=name, date=datetime.fromtimestamp(date_ts))
            db.session.add(formulation)
            db.session.commit()
            for fpkey, fpval in property_list:
                formulation.formulation_property.append(FormulationProperty(key=fpkey, value=fpval))
            db.session.commit()
            resp = flask.Response(json.dumps({'status': 'success',
                                              'new_formulation_id': formulation.id,
                                              'new_formulation_name': formulation.name}))

    return set_debug_response_header(resp)