我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用flask.request.content_length()。
def capture_request(self): if not current_app.debug: # only capture request body on debug return if not self.options.include_request_body: # only capture request body if requested return if ( request.content_length and self.options.include_request_body is not True and request.content_length >= self.options.include_request_body ): # don't capture request body if it's too large return if not request.get_json(force=True, silent=True): # only capture request body if json return self.request_body = request.get_json(force=True)
def upload_zip(): # param_dict??????? param_dict = dict.fromkeys(upload_request_param_list, None) start_time = time.time() file_size = request.content_length try: # ????????? parameter = request.form for param in upload_request_param_list: param_dict[param] = parameter.get(param) except: raise Exception
def post_file() -> JSONResponse[str]: """Temporarily store some data on the server. .. :quickref: File; Safe a file temporarily on the server. .. note:: The posted data will be removed after 60 seconds. :returns: A response with the JSON serialized name of the file as content and return code 201. :raises APIException: If the request is bigger than the maximum upload size. (REQUEST_TOO_LARGE) :raises PermissionException: If there is no logged in user. (NOT_LOGGED_IN) """ if ( request.content_length and request.content_length > app.config['MAX_UPLOAD_SIZE']): raise APIException( 'Uploaded file is too big.', 'Request is bigger than maximum upload size of {}.'.format( app.config['MAX_UPLOAD_SIZE'] ), APICodes.REQUEST_TOO_LARGE, 400 ) path, name = psef.files.random_file_path('MIRROR_UPLOAD_DIR') FileStorage(request.stream).save(path) return jsonify(name, status_code=201)
def before_request(): content_length = request.content_length if content_length is not None and content_length > 1024: raise ApiError( message='FROG CANNOT EXCEED MAXIMUM SIZE IN GIRTH, WIDTH OR LENGTH.', status_code=413)
def validate_filesize(size): ''' Validate if file size is too large or empty size: size of file ''' if size > config.max_file_size * 1024 * 1024: abort(413) if not request.content_length or not size: logger.error('Request {} {} with empty file.'.format(request.method, request.path)) abort(411)
def webpage_upload(): try: assert int(request.content_length) < 20000, 'Too big' file = request.files['file[]'] # fname = secure_filename(file.filename) # extension = os.path.splitext(file.filename)[1] # fname = str(uuid.uuid4()) + extension # file is a mixin, save() is a werkzeug method which calls # generic builtin open() and copies file.stream() # file.save(os.path.join(BP.UPLOADS, fname)) contentstr = file.read().decode() m = ManifestDestiny('', '', contentstr) msg = m.response.data.decode() _load_data() return render_all(okmsg=msg + ': ' + file.filename) except Exception as e: return render_all(errmsg='Upload("%s") failed: %s' % ( file.filename, str(e))) _load_data() return render_all(okmsg='Upload %s complete' % file.filename) ########################################################################### # API # See blueprint registration in manifest_api.py, these are relative paths
def stream_to_storage(project_id: str): project_oid = utils.str2id(project_id) projects = current_app.data.driver.db['projects'] project = projects.find_one(project_oid, projection={'_id': 1}) if not project: raise wz_exceptions.NotFound('Project %s does not exist' % project_id) log.info('Streaming file to bucket for project=%s user_id=%s', project_id, current_user.user_id) log.info('request.headers[Origin] = %r', request.headers.get('Origin')) log.info('request.content_length = %r', request.content_length) # Try a check for the content length before we access request.files[]. # This allows us to abort the upload early. The entire body content length # is always a bit larger than the actual file size, so if we accept here, # we're sure it'll be accepted in subsequent checks as well. if request.content_length: assert_file_size_allowed(request.content_length) uploaded_file = request.files['file'] # Not every upload has a Content-Length header. If it was passed, we might # as well check for its value before we require the user to upload the # entire file. (At least I hope that this part of the code is processed # before the body is read in its entirety) if uploaded_file.content_length: assert_file_size_allowed(uploaded_file.content_length) override_content_type(uploaded_file) if not uploaded_file.content_type: log.warning('File uploaded to project %s without content type.', project_oid) raise wz_exceptions.BadRequest('Missing content type.') if uploaded_file.content_type.startswith('image/') or uploaded_file.content_type.startswith( 'video/'): # We need to do local thumbnailing and ffprobe, so we have to write the stream # both to Google Cloud Storage and to local storage. local_file = tempfile.NamedTemporaryFile( dir=current_app.config['STORAGE_DIR']) uploaded_file.save(local_file) local_file.seek(0) # Make sure that re-read starts from the beginning. else: local_file = uploaded_file.stream result = upload_and_process(local_file, uploaded_file, project_id) resp = jsonify(result) resp.status_code = result['status_code'] add_access_control_headers(resp) return resp
def create_file_doc_for_upload(project_id, uploaded_file): """Creates a secure filename and a document in MongoDB for the file. The (project_id, filename) tuple should be unique. If such a document already exists, it is updated with the new file. :param uploaded_file: file from request.files['form-key'] :type uploaded_file: werkzeug.datastructures.FileStorage :returns: a tuple (file_id, filename, status), where 'filename' is the internal filename used on GCS. """ project_id = ObjectId(project_id) # Hash the filename with path info to get the internal name. This should # be unique for the project. # internal_filename = uploaded_file.filename _, ext = os.path.splitext(uploaded_file.filename) internal_filename = uuid.uuid4().hex + ext # For now, we don't support overwriting files, and create a new one every time. # # See if we can find a pre-existing file doc. # files = current_app.data.driver.db['files'] # file_doc = files.find_one({'project': project_id, # 'name': internal_filename}) file_doc = None # TODO: at some point do name-based and content-based content-type sniffing. new_props = {'filename': uploaded_file.filename, 'content_type': uploaded_file.mimetype, 'length': uploaded_file.content_length, 'project': project_id, 'status': 'uploading'} if file_doc is None: # Create a file document on MongoDB for this file. file_doc = create_file_doc(name=internal_filename, **new_props) file_fields, _, _, status = current_app.post_internal('files', file_doc) else: file_doc.update(new_props) file_fields, _, _, status = current_app.put_internal('files', remove_private_keys(file_doc)) if status not in (200, 201): log.error('Unable to create new file document in MongoDB, status=%i: %s', status, file_fields) raise wz_exceptions.InternalServerError() log.debug('Created file document %s for uploaded file %s; internal name %s', file_fields['_id'], uploaded_file.filename, internal_filename) return file_fields['_id'], internal_filename, status