我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用werkzeug.datastructures.FileStorage()。
def _make_zip(self, project, ty): name = self._project_name_latin_encoded(project) json_task_generator = self._respond_json(ty, project.id) if json_task_generator is not None: datafile = tempfile.NamedTemporaryFile() try: datafile.write(json.dumps(json_task_generator)) datafile.flush() zipped_datafile = tempfile.NamedTemporaryFile() try: _zip = self._zip_factory(zipped_datafile.name) _zip.write(datafile.name, secure_filename('%s_%s.json' % (name, ty))) _zip.close() container = "user_%d" % project.owner_id _file = FileStorage(filename=self.download_name(project, ty), stream=zipped_datafile) uploader.upload_file(_file, container=container) finally: zipped_datafile.close() finally: datafile.close()
def _make_zip(self, project, ty): name = self._project_name_latin_encoded(project) csv_task_generator = self._respond_csv(ty, project.id) if csv_task_generator is not None: # TODO: use temp file from csv generation directly datafile = tempfile.NamedTemporaryFile() try: for line in csv_task_generator: datafile.write(str(line)) datafile.flush() csv_task_generator.close() # delete temp csv file zipped_datafile = tempfile.NamedTemporaryFile() try: _zip = self._zip_factory(zipped_datafile.name) _zip.write( datafile.name, secure_filename('%s_%s.csv' % (name, ty))) _zip.close() container = "user_%d" % project.owner_id _file = FileStorage( filename=self.download_name(project, ty), stream=zipped_datafile) uploader.upload_file(_file, container=container) finally: zipped_datafile.close() finally: datafile.close()
def test_crop(self): """Test UPLOADER crop works.""" u = Uploader() size = (100, 100) im = Image.new('RGB', size) folder = tempfile.mkdtemp() u.upload_folder = folder im.save(os.path.join(folder, 'image.png')) coordinates = (0, 0, 50, 50) file = FileStorage(filename=os.path.join(folder, 'image.png')) with patch('pybossa.uploader.Image', return_value=True): err_msg = "It should crop the image" assert u.crop(file, coordinates) is True, err_msg with patch('pybossa.uploader.Image.open', side_effect=IOError): err_msg = "It should return false" assert u.crop(file, coordinates) is False, err_msg
def test_environ_builder_unicode_file_mix(self): for use_tempfile in False, True: f = FileStorage(BytesIO(u'\N{SNOWMAN}'.encode('utf-8')), 'snowman.txt') d = MultiDict(dict(f=f, s=u'\N{SNOWMAN}')) stream, length, boundary = stream_encode_multipart( d, use_tempfile, threshold=150) self.assert_true(isinstance(stream, BytesIO) != use_tempfile) _, form, files = parse_form_data({ 'wsgi.input': stream, 'CONTENT_LENGTH': str(length), 'CONTENT_TYPE': 'multipart/form-data; boundary="%s"' % boundary }) self.assert_strict_equal(form['s'], u'\N{SNOWMAN}') self.assert_strict_equal(files['f'].name, 'f') self.assert_strict_equal(files['f'].filename, u'snowman.txt') self.assert_strict_equal(files['f'].read(), u'\N{SNOWMAN}'.encode('utf-8')) stream.close()
def __call__(self, form, field): if not (isinstance(field.data, FileStorage) and field.data): return filename = field.data.filename.lower() if isinstance(self.upload_set, Iterable): if any(filename.endswith('.' + x) for x in self.upload_set): return raise StopValidation(self.message or field.gettext( 'File does not have an approved extension: {extensions}' ).format(extensions=', '.join(self.upload_set))) if not self.upload_set.file_allowed(field.data, filename): raise StopValidation(self.message or field.gettext( 'File does not have an approved extension.' ))
def convert(self, value, op): # Don't cast None if value is None: if self.nullable: return None else: raise ValueError('Must not be null!') # and check if we're expecting a filestorage and haven't overridden `type` # (required because the below instantiation isn't valid for FileStorage) elif isinstance(value, FileStorage) and self.type == FileStorage: return value try: return self.type(value, self.name, op) except TypeError: try: if self.type is decimal.Decimal: return self.type(str(value), self.name) else: return self.type(value, self.name) except TypeError: return self.type(value)
def test_rackspace_uploader_upload_correct_file(self, mock, mock2): """Test RACKSPACE UPLOADER upload file works.""" with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf: mycf.upload_file.return_value=True mycf.get_object.side_effect = NoSuchObject u = RackspaceUploader() u.init_app(self.flask_app) file = FileStorage(filename='test.jpg') err_msg = "Upload file should return True" assert u.upload_file(file, container='user_3') is True, err_msg calls = [call.get_container('user_3'), call.get_container().get_object('test.jpg')] mycf.assert_has_calls(calls, any_order=True)
def test_rackspace_uploader_upload_correct_purgin_first_file(self, mock, mock2): """Test RACKSPACE UPLOADER upload file purging first file works.""" with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf: mycf.upload_file.return_value=True mycf.get_object.side_effect = True u = RackspaceUploader() u.init_app(self.flask_app) file = FileStorage(filename='test.jpg') err_msg = "Upload file should return True" assert u.upload_file(file, container='user_3') is True, err_msg calls = [call.get_container('user_3'), call.get_container().get_object().delete(), call.get_container().get_object('test.jpg')] print mycf.mock_calls mycf.assert_has_calls(calls, any_order=True)
def test_rackspace_uploader_upload_file_fails(self, mock, mock2): """Test RACKSPACE UPLOADER upload file fail works.""" with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf: from pyrax.exceptions import UploadFailed mycf.upload_file.side_effect = UploadFailed u = RackspaceUploader() u.init_app(self.flask_app) file = FileStorage(filename='test.jpg') err_msg = "Upload file should return False" assert u.upload_file(file, container='user_3') is False, err_msg
def test_rackspace_uploader_upload_wrong_file(self, mock, mock2): """Test RACKSPACE UPLOADER upload wrong file extension works.""" with patch('pybossa.uploader.rackspace.pyrax.cloudfiles') as mycf: mycf.upload_file.return_value = True u = RackspaceUploader() u.init_app(self.flask_app) file = FileStorage(filename='test.docs') err_msg = "Upload file should return False" res = u.upload_file(file, container='user_3') assert res is False, err_msg
def test_local_uploader_upload_fails(self, mock): """Test LOCAL UPLOADER upload fails.""" u = LocalUploader() file = FileStorage(filename='test.jpg') res = u.upload_file(file, container='user_3') err_msg = ("Upload file should return False, \ as there is an exception") assert res is False, err_msg
def test_local_uploader_upload_correct_file(self, mock): """Test LOCAL UPLOADER upload works.""" mock.save.return_value = None u = LocalUploader() file = FileStorage(filename='test.jpg') res = u.upload_file(file, container='user_3') err_msg = ("Upload file should return True, \ as this extension is allowed") assert res is True, err_msg
def test_local_uploader_upload_wrong_file(self, mock): """Test LOCAL UPLOADER upload works with wrong extension.""" mock.save.return_value = None u = LocalUploader() file = FileStorage(filename='test.txt') res = u.upload_file(file, container='user_3') err_msg = ("Upload file should return False, \ as this extension is not allowed") assert res is False, err_msg
def test_file_exists_for_real_file(self): """Test LOCAL UPLOADER file_exists returns True if the file exists""" u = LocalUploader() u.upload_folder = tempfile.mkdtemp() file = FileStorage(filename='test.jpg') container = 'mycontainer' u.upload_file(file, container=container) assert u.file_exists('test.jpg', container) is True
def process_formdata(self, valuelist): valuelist = (x for x in valuelist if isinstance(x, FileStorage) and x) data = next(valuelist, None) if data is not None: self.data = data else: self.raw_data = ()
def has_file(self): """Return ``True`` if ``self.data`` is a :class:`~werkzeug.datastructures.FileStorage` object. .. deprecated:: 0.14.1 ``data`` is no longer set if the input is not a non-empty ``FileStorage``. Check ``form.data is not None`` instead. """ warnings.warn(FlaskWTFDeprecationWarning( '"has_file" is deprecated and will be removed in 1.0. The data is ' 'checked during processing instead.' )) return bool(self.data)
def __call__(self, form, field): if not (isinstance(field.data, FileStorage) and field.data): if self.message is None: message = field.gettext('This field is required.') else: message = self.message raise StopValidation(message)
def setUp(self): self.app = Flask(__name__) self.app.config["SECRET_KEY"] = "test" self.app.config["WTF_CSRF_ENABLED"] = False self.app.config['TESTING'] = True self.app.config["FILEUPLOAD_ALLOWED_EXTENSIONS"] = ["png", "jpg"] self.app.config["FILEUPLOAD_CONVERT_TO_SNAKE_CASE"] = True self.login_manager = LoginManager(self.app) self.ffu = FlaskFileUpload(self.app) self.dummy_stream = io.BytesIO(b"some initial text data") self.fs = FileStorage(self.dummy_stream, "dummy.png")
def is_archive(file: FileStorage) -> bool: """Checks whether the file ends with a known archive file extension. File extensions are known if they are recognized by the archive module. :param file: Some file :returns: True if the file has a known extension """ return file.filename.endswith(_known_archive_extensions)
def extract_to_temp( file: FileStorage, ignore_filter: IgnoreFilterManager, handle_ignore: IgnoreHandling = IgnoreHandling.keep ) -> str: """Extracts the contents of file into a temporary directory. :param file: The archive to extract. :param ignore_filter: The files and directories that should be ignored. :param handle_ignore: Determines how ignored files should be handled. :returns: The pathname of the new temporary directory. """ tmpfd, tmparchive = tempfile.mkstemp() try: os.remove(tmparchive) tmparchive += os.path.basename( secure_filename('archive_' + file.filename) ) tmpdir = tempfile.mkdtemp() file.save(tmparchive) if handle_ignore == IgnoreHandling.error: arch = archive.Archive(tmparchive) wrong_files = ignore_filter.get_ignored_files_in_archive(arch) if wrong_files: raise IgnoredFilesException(invalid_files=wrong_files) arch.extract(to_path=tmpdir, method='safe') else: archive.extract(tmparchive, to_path=tmpdir, method='safe') if handle_ignore == IgnoreHandling.delete: ignore_filter.delete_from_dir(tmpdir) finally: os.close(tmpfd) os.remove(tmparchive) return tmpdir
def extract( file: FileStorage, ignore_filter: IgnoreFilterManager = None, handle_ignore: IgnoreHandling = IgnoreHandling.keep ) -> t.Optional[ExtractFileTree]: """Extracts all files in archive with random name to uploads folder. :param werkzeug.datastructures.FileStorage file: The file to extract. :param ignore_filter: What files should be ignored in the given archive. This can only be None when ``handle_ignore`` is ``IgnoreHandling.keep``. :param handle_ignore: How should ignored file be handled. :returns: A file tree as generated by :py:func:`rename_directory_structure`. """ if handle_ignore == IgnoreHandling.keep and ignore_filter is None: ignore_filter = IgnoreFilterManager([]) elif ignore_filter is None: # pragma: no cover raise ValueError tmpdir = extract_to_temp( file, ignore_filter, handle_ignore, ) rootdir = tmpdir.rstrip(os.sep) start = rootdir.rfind(os.sep) + 1 try: res = rename_directory_structure(tmpdir)[tmpdir[start:]] filename: str = file.filename.split('.')[0] if not res: return None elif len(res) > 1: return {filename: res if isinstance(res, list) else [res]} elif not isinstance(res[0], t.MutableMapping): return {filename: res} else: return res[0] finally: shutil.rmtree(tmpdir)
def test_call_file(self): handler = TestBaseHandler.DummyHandler() env = Environ({"wsgi.input": None}) request = Request(env) request.files = {"file": FileStorage(filename="foo")} res = handler._call(handler.file, {}, request) self.assertEqual("foo", res)
def test_get_file_name(self): request = Request(Environ()) request.files = { "file": FileStorage(filename="foo") } self.assertEqual("foo", BaseHandler._get_file_name(request))
def test_get_file_content(self): request = Request(Environ()) stream = _io.BytesIO("hello world".encode()) request.files = {"file": FileStorage(stream=stream, filename="foo")} self.assertEqual("hello world", BaseHandler._get_file_content(request).decode())
def test_save_file(self): file_path_fixture = self.get_fixture_file_path("thumbnails/th01.png") th_file = FileStorage( stream=open(file_path_fixture, "rb"), filename="th01.png" ) full_path = thumbnail.save_file("shots", "instance-id", th_file) thumbnail.turn_into_thumbnail(full_path, thumbnail.RECTANGLE_SIZE) im = Image.open(full_path) (width, height) = im.size self.assertEqual(width, 150) self.assertEqual(height, 100)
def _attachments_as_dict( filestorages: Iterable[FileStorage], attachment_encoder: AttachmentEncoder) -> Iterable[dict]: for filestorage in filestorages: filename = filestorage.filename content = attachment_encoder.encode(filestorage.stream.read()) if filename and content: yield {'filename': filename, 'content': content}