Python flask.request 模块,content_type() 实例源码


项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def post(current_user, bucket_id):
    Storing an item into a Bucket
    :param current_user: User
    :param bucket_id: Bucket Id
    :return: Http Response
    if not request.content_type == 'application/json':
        return response('failed', 'Content-type must be application/json', 401)

    data = request.get_json()
    item_name = data.get('name')
    if not item_name:
        return response('failed', 'No name or value attribute found', 401)

    # Get the user Bucket
    bucket = get_user_bucket(current_user, bucket_id)
    if bucket is None:
        return response('failed', 'User has no Bucket with Id ' + bucket_id, 202)

    # Save the Bucket Item into the Database
    item = BucketItem(item_name.lower(), data.get('description', None),
    return response_with_bucket_item('success', item, 200)
项目:heroku-python-boilerplate    作者:chavli    | 项目源码 | 文件源码
def validate_jpeg_binary(func):
    """ checks the mimetype and the binary data to ensure it's a JPEG """
    def wrapper(*args, **kwargs):
        if request.content_type != "image/jpeg":
            return ErrorResponseJson("invalid content type: {}".format(request.content_type)).make_response()
        if imghdr.test_jpeg(, None) != "jpeg":
            return ErrorResponseJson("invalid jpeg data").make_response()
        return func(*args, **kwargs)
    return wrapper
项目:pia    作者:soasme    | 项目源码 | 文件源码
def builtin_jq():
    Builtin program: `jq`.
    It will run a `jq` progress and return a json object.
    program = request.args.get('program', ".")
    command =
        data = jq(program, command)
        resp = make_response(data)
        resp.content_type = 'application/json'
        return resp
    except InvalidJQFilter as exception:
        return jsonify(message=str(exception)), 400
项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def reset_password(current_user):
    if request.content_type == "application/json":
        data = request.get_json()
        old_password = data.get('oldPassword')
        new_password = data.get('newPassword')
        password_confirmation = data.get('passwordConfirmation')
        if not old_password or not new_password or not password_confirmation:
            return response('failed', "Missing required attributes", 400)
        if bcrypt.check_password_hash(current_user.password, old_password.encode('utf-8')):
            if not new_password == password_confirmation:
                return response('failed', 'New Passwords do not match', 400)
            if not len(new_password) > 4:
                return response('failed', 'New password should be greater than four characters long', 400)
            return response('success', 'Password reset successfully', 200)
        return response('failed', "Incorrect password", 401)
    return response('failed', 'Content type must be json', 400)

# Register classes as views
项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def edit_bucket(current_user, bucket_id):
    Validate the bucket Id. Also check for the name attribute in the json payload.
    If the name exists update the bucket with the new name.
    :param current_user: Current User
    :param bucket_id: Bucket Id
    :return: Http Json response
    if request.content_type == 'application/json':
        data = request.get_json()
        name = data.get('name')
        if name:
            except ValueError:
                return response('failed', 'Please provide a valid Bucket Id', 400)
            user_bucket = User.get_by_id(
            if user_bucket:
                return response_for_created_bucket(user_bucket, 201)
            return response('failed', 'The Bucket with Id ' + bucket_id + ' does not exist', 404)
        return response('failed', 'No attribute or value was specified, nothing was changed', 400)
    return response('failed', 'Content-type must be json', 202)
项目:pia    作者:soasme    | 项目源码 | 文件源码
def builtin_echo():
    Builtin program: `echo`.
    It will response form data.
    resp = make_response(
    resp.content_type = request.content_type
    return resp
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def report(node):
    if not node.maintenance_mode:
            node.active_config_version = int(request.args.get('version'))
        except (ValueError, TypeError):
            return abort(400)

        if request.content_type != 'application/json':
            return abort(400)

        provision = models.Provision()
        provision.node = node
        provision.config_version = node.active_config_version
        provision.ignition_config =
        if node.target_config_version == node.active_config_version:
            provision.ipxe_config = config_renderer.ipxe.render(node, request.url_root)


        models.Disk.wipe_next_boot: False

    if node.cluster.are_etcd_nodes_configured:
        node.cluster.assert_etcd_cluster_exists = True

    return Response('ok', mimetype='application/json')
项目:WebAct    作者:CreatCodeBuild    | 项目源码 | 文件源码
def image():
    # json = request.get_json()
    print('contentType', request.content_type)
    # length = len(json['binaryData'])
    print('data length', len(
    resultImageBuffer = process_image(
    # resultJsonString = JSONEncoder().encode({'resultImageBuffer': resultImageBuffer})
    return resultImageBuffer
项目:apizen    作者:blackmatrix7    | 项目源码 | 文件源码
def default_after_request(param):
    response_param = {'charset': param.charset,
                      'content_length': param.content_length,
                      'content_type': param.content_type,
                      'content_encoding': param.content_encoding,
                      'mimetype': param.mimetype,
                      'response': g.result if hasattr(g, 'result') else None,
                      'status': param.status,
                      'status_code': param.status_code}
    g.response_time =
    time_consuming = str(g.response_time - g.request_time)
    log_info = {'api_method': g.get('api_method'), 'api_version': g.get('api_version'),
                'request_param': g.get('request_param'), 'request_form': g.get('request_form'),
                'querystring': g.get('request_param')['query_string'], 'request_json': g.get('request_json'),
                'response_param': response_param, 'request_raw_data': g.request_raw_data,
                'request_time': g.get('request_time').strftime(current_app.config['APIZEN_DATETIME_FMT']),
                'response_time': g.get('response_time').strftime(current_app.config['APIZEN_DATETIME_FMT']),
                'time_consuming': time_consuming}
    if param.status_code >= 400:
    return param

# ??????
项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def edit_item(current_user, bucket_id, item_id):
    Edit an item with a valid Id. The request content-type must be json and also the Bucket
    in which the item belongs must be among the user`s Buckets.
    The name of the item must be present in the payload but the description is optional.
    :param current_user: User
    :param bucket_id: Bucket Id
    :param item_id: Item Id
    :return: Response of Edit Item
    if not request.content_type == 'application/json':
        return response('failed', 'Content-type must be application/json', 401)

    except ValueError:
        return response('failed', 'Provide a valid item Id', 202)

    # Get the user Bucket
    bucket = get_user_bucket(current_user, bucket_id)
    if bucket is None:
        return response('failed', 'User has no Bucket with Id ' + bucket_id, 202)

    # Get the item
    item = bucket.items.filter_by(id=item_id).first()
    if not item:

    # Check for Json data
    request_json_data = request.get_json()
    item_new_name = request_json_data.get('name')
    item_new_description = request_json_data.get('description', None)

    if not request_json_data:
        return response('failed', 'No attributes specified in the request', 401)

    if not item_new_name:
        return response('failed', 'No name or value attribute found', 401)

    # Update the item record
    item.update(item_new_name, item_new_description)
    return response_with_bucket_item('success', item, 200)
项目:plex_autoscan    作者:l3uddz    | 项目源码 | 文件源码
def client_pushed():
    if request.content_type == 'application/json':
        data = request.get_json(silent=True)
        data = request.form.to_dict()

    if not data:
        logger.error("Invalid scan request from: %r", request.remote_addr)
    logger.debug("Client %r request dump:\n%s", request.remote_addr, json.dumps(data, indent=4, sort_keys=True))

    if ('eventType' in data and data['eventType'] == 'Test') or ('EventType' in data and data['EventType'] == 'Test'):"Client %r made a test request, event: '%s'", request.remote_addr, 'Test')
    elif 'eventType' in data and data['eventType'] == 'Manual':"Client %r made a manual scan request for: '%s'", request.remote_addr, data['filepath'])
        final_path = utils.map_pushed_path(conf.configs, data['filepath'])
        # ignore this request?
        ignore, ignore_match = utils.should_ignore(final_path, conf.configs)
        if ignore:
  "Ignored scan request for '%s' because '%s' was matched from SERVER_IGNORE_LIST", final_path,
            return "Ignoring scan request because %s was matched from your SERVER_IGNORE_LIST" % ignore_match
        if start_scan(final_path, 'manual', 'Manual'):
            return "'%s' was added to scan backlog." % final_path
            return "Error adding '%s' to scan backlog." % data['filepath']

    elif 'Movie' in data:"Client %r scan request for movie: '%s', event: '%s'", request.remote_addr,
                    data['Movie']['FilePath'], data['EventType'])
        final_path = utils.map_pushed_path(conf.configs, data['Movie']['FilePath'])
        start_scan(final_path, 'radarr', data['EventType'])
    elif 'movie' in data:
        # new radarr webhook
        path = os.path.join(data['movie']['folderPath'], data['movieFile']['relativePath'])"Client %r scan request for movie: '%s', event: '%s'", request.remote_addr, path,
                    "Upgrade" if data['isUpgrade'] else data['eventType'])
        final_path = utils.map_pushed_path(conf.configs, path)
        start_scan(final_path, 'radarr', "Upgrade" if data['isUpgrade'] else data['eventType'])
    elif 'Series' in data:"Client %r scan request for series: '%s', event: '%s'", request.remote_addr, data['Series']['Path'],
        final_path = utils.map_pushed_path(conf.configs, data['Series']['Path'])
        start_scan(final_path, 'sonarr', data['EventType'])
    elif 'series' and 'episodeFile' in data:
        # new sonarr webhook
        path = os.path.join(data['series']['path'], data['episodeFile']['relativePath'])"Client %r scan request for series: '%s', event: '%s'", request.remote_addr, path,
                    "Upgrade" if data['isUpgrade'] else data['eventType'])
        final_path = utils.map_pushed_path(conf.configs, path)
        start_scan(final_path, 'sonarr_dev', "Upgrade" if data['isUpgrade'] else data['eventType'])
        logger.error("Unknown scan request from: %r", request.remote_addr)

    return "OK"
