我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.files.File()。
def setUp(self): self.superuser = create_superuser() self.client.login(username='admin', password='secret') self.img = create_image() self.image_name = 'test_file.jpg' self.filename = os.path.join(settings.FILE_UPLOAD_TEMP_DIR, self.image_name) self.img.save(self.filename, 'JPEG') self.file = DjangoFile(open(self.filename, 'rb'), name=self.image_name) # This is actually a "file" for filer considerations self.image = Image.objects.create(owner=self.superuser, original_filename=self.image_name, file=self.file) self.clipboard = Clipboard.objects.create(user=self.superuser) self.clipboard.append_file(self.image) self.folder = Folder.objects.create(name='test_folder')
def test_file_change_upload_to_destination(self): """ Test that the file is actualy move from the private to the public directory when the is_public is checked on an existing private file. """ file_obj = DjangoFile(open(self.filename, 'rb'), name=self.image_name) image = Image.objects.create(owner=self.superuser, is_public=False, original_filename=self.image_name, file=file_obj) image.save() self.assertTrue(image.file.path.startswith(filer_settings.FILER_PRIVATEMEDIA_STORAGE.location)) image.is_public = True image.save() self.assertTrue(image.file.path.startswith(filer_settings.FILER_PUBLICMEDIA_STORAGE.location)) self.assertEqual(len(image.icons), len(filer_settings.FILER_ADMIN_ICON_SIZES)) image.is_public = False image.save() self.assertTrue(image.file.path.startswith(filer_settings.FILER_PRIVATEMEDIA_STORAGE.location)) self.assertEqual(len(image.icons), len(filer_settings.FILER_ADMIN_ICON_SIZES))
def save(self, name, content, max_length=None): """ Saves new content to the file specified by name. The content should be a proper File object or any python file-like object, ready to be read from the beginning. """ # Get the proper name for the file, as it will actually be saved. if name is None: name = content.name if not hasattr(content, 'chunks'): content = File(content, name) name = self.get_available_name(name, max_length=max_length) return self._save(name, content) # These methods are part of the public API, with default implementations.
def _get_encoded_attachments(self): attachments = self.get_attachments() if attachments: new_attachments = [] for attachment in attachments: if isinstance(attachment, File): attachment.seek(0) new_attachments.append((attachment.name, attachment.read(), guess_type(attachment.name)[0])) else: new_attachments.append(attachment) attachments = new_attachments return jsonpickle.dumps(attachments)
def archive(self, processed_file): """ Write the .csv file and upload a copy to the archive. """ # Remove previous .CSV files processed_file.file_archive.delete() # Export a new one processed_file.make_csv_copy() # Open up the .CSV file for reading so we can wrap it in the Django File obj with open(processed_file.csv_path, 'rb') as csv_file: # Save the .CSV on the processed data file processed_file.file_archive.save( '%s.csv' % self.model_name, File(csv_file), ) # Save it to the model processed_file.file_size = os.path.getsize(processed_file.csv_path) processed_file.save() return
def put(self, request): try: zip_file = request.FILES['file'] archive = zipfile.ZipFile(zip_file) except (MultiValueDictKeyError, zipfile.BadZipfile): raise NotZIPFileError try: csv_name = [item for item in archive.namelist() if item.endswith('csv')][0] except IndexError: raise NoCSVInArchiveFoundError with archive.open(csv_name) as zip_csv_file: # Convert zipfile handle to Django file handle csv_file = File(zip_csv_file) dataset = Dataset.objects.create( name=zip_csv_file.name, content=csv_file, uploaded_by=request.user) # Start tasks for feature calculation initialize_from_dataset.delay(dataset_id=dataset.id) serializer = DatasetSerializer(instance=dataset) return Response(serializer.data)
def save(self, *args: typing.Any, **kwargs: typing.Any) -> None: super(Gallery, self).save(*args, **kwargs) if self.thumbnail_url and not self.thumbnail: response = request_with_retries( self.thumbnail_url, { 'timeout': 25, 'stream': True }, post=False, ) if response: disassembled = urlparse(self.thumbnail_url) file_name = basename(disassembled.path) lf = NamedTemporaryFile() if response.status_code == requests.codes.ok: for chunk in response.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks lf.write(chunk) self.thumbnail.save(file_name, File(lf), save=False) lf.close() super(Gallery, self).save(force_update=True)
def save_img(self, img_link: str) -> None: tf2 = NamedTemporaryFile() request_file = requests.get(img_link, stream='True', timeout=25) for chunk in request_file.iter_content(4096): tf2.write(chunk) self.image.save(os.path.splitext(img_link)[1], File(tf2), save=False) tf2.close() im = PImage.open(self.image.path) if im.mode != 'RGB': im = im.convert('RGB') # large thumbnail im.thumbnail((200, 290), PImage.ANTIALIAS) thumb_relative_path = upload_announce_thumb_handler(self, os.path.splitext(img_link)[1]) thumb_fn = pjoin(settings.MEDIA_ROOT, thumb_relative_path) os.makedirs(os.path.dirname(thumb_fn), exist_ok=True) im.save(thumb_fn, "JPEG") self.thumbnail.name = thumb_relative_path self.save()
def copy_img(self, img_path: str) -> None: tf2 = NamedTemporaryFile() shutil.copy(img_path, tf2.name) self.image.save(os.path.splitext(img_path)[1], File(tf2), save=False) tf2.close() im = PImage.open(self.image.path) if im.mode != 'RGB': im = im.convert('RGB') # large thumbnail im.thumbnail((200, 290), PImage.ANTIALIAS) thumb_relative_path = upload_announce_thumb_handler(self, os.path.splitext(img_path)[1]) thumb_fn = pjoin(settings.MEDIA_ROOT, thumb_relative_path) os.makedirs(os.path.dirname(thumb_fn), exist_ok=True) im.save(thumb_fn, "JPEG") self.thumbnail.name = thumb_relative_path self.save()
def set_image(instance, field, width=1920, height=1440): """ FileField / ImageField """ manager = getattr(instance, field.name) try: image_type = random.choice(['people', 'places', 'things']) response = requests.get( 'https://placem.at/%s?w=%d&h=%d&random=1&txt=' % (image_type, width, height), timeout=5, stream=True ) except (ConnectionError, Timeout): response = requests.get('http://baconmockup.com/%d/%d/' % (width, height), stream=True) tfp = tempfile.NamedTemporaryFile(delete=False) with tfp: for chunk in response.iter_content(1024 * 1024): tfp.write(chunk) tfp.seek(0) manager.save('image.jpg', File(tfp), save=False)
def save(self, commit=True): instance = super(InspirationQuoteForm, self).save(commit=True) if self.cleaned_data["delete_picture"] and instance.picture: instance.picture.delete() if self.cleaned_data["picture_path"]: tmp_path = self.cleaned_data["picture_path"] abs_tmp_path = os.path.join(settings.MEDIA_ROOT, tmp_path) filename = InspirationQuote._meta.get_field("picture").upload_to(instance, tmp_path) instance.picture.save(filename, File(open(abs_tmp_path, "rb")), False) os.remove(abs_tmp_path) instance.save() return instance
def test_get_model_with_file(self): emmen = Zoo(name='Wildlands Adventure Zoo Emmen') with temp_imagefile(100, 200, 'jpeg') as file: emmen.floor_plan.save('plan.jpg', File(file), save=False) emmen.save() response = self.client.get('/zoo/%d/' % emmen.id) self.assertEqual(response.status_code, 200) result = jsonloads(response.content) self.assertEqual(emmen.id, result['data']['id']) self.assertEqual(emmen.name, result['data']['name'], 'Wildlands Adventure Zoo Emmen') self.assertEqual('/zoo/%d/floor_plan/' % emmen.id, result['data']['floor_plan']) # This is a basic regression test for a bug due to the router # singleton refactor, GET would crash if the model simply # _contained_ a file attribute.
def test_get_related_model_with_file(self): emmen = Zoo(name='Wildlands Adventure Zoo Emmen') with temp_imagefile(100, 200, 'jpeg') as file: emmen.floor_plan.save('plan.jpg', File(file), save=False) emmen.save() donald = Animal(name='Donald Duck', zoo=emmen) donald.save() response = self.client.get('/animal/%d/' % donald.id, data={'with': 'zoo'}) self.assertEqual(response.status_code, 200) result = jsonloads(response.content) self.assertEqual(donald.id, result['data']['id']) self.assertEqual({'zoo': 'zoo'}, result['with_mapping']) zoo = result['with']['zoo'][0] self.assertEqual(emmen.id, zoo['id']) self.assertEqual(emmen.name, zoo['name'], 'Wildlands Adventure Zoo Emmen') self.assertEqual('/zoo/%d/floor_plan/' % emmen.id, zoo['floor_plan']) # Same as above, but in multi-put's code path
def test_multi_put_model_with_existing_file(self): emmen = Zoo(name='Wildlands Adventure Zoo Emmen') with temp_imagefile(100, 200, 'jpeg') as file: emmen.floor_plan.save('plan.jpg', File(file), save=False) emmen.save() model_data = { 'data': [{ 'id': emmen.id, 'name': 'Wildlands!', }] } response = self.client.put('/zoo/', data=json.dumps(model_data), content_type='application/json') self.assertEqual(response.status_code, 200)
def handle(self, *args, **options): base_dir = os.path.dirname(os.path.abspath(__file__)) dictionaries_path = os.path.join(base_dir, self.DATA_DIR) instruments_dirs = os.listdir(dictionaries_path) for instrument_dir in instruments_dirs: # ?????????? ??????? ????? ? ?????. if not instrument_dir.startswith('.'): instrument, _ = Instrument.objects.get_or_create(name=instrument_dir) sounds_path = os.path.join(dictionaries_path, instrument_dir) sounds_names = [ sound_name for sound_name in os.listdir(sounds_path) if os.path.isfile(os.path.join(sounds_path, sound_name)) and not sound_name.startswith('.') ] for sound_name in sounds_names: with open(os.path.join(sounds_path, sound_name), 'rb') as sound_file: wrapped_file = File(sound_file) sound, _ = Sound.objects.get_or_create( name=os.path.splitext(sound_name)[0], instrument=instrument ) sound.file = wrapped_file sound.save() self.stdout.write(self.style.SUCCESS('Successfully created'))
def get_or_import_file(self, options): assert self.post_type == 'attachment' if self.file: return self.file # download content into deleted temp_file temp_file = NamedTemporaryFile(delete=True) temp_file.write(urlopen(force_bytes(self.guid)).read()) temp_file.flush() # create DjangoFile object django_file = DjangoFile(temp_file, name=self.guid.split('/')[-1]) # choose folder if self.parent: folder = self.parent.get_or_create_folder(options) else: folder = options.file_folder # import file self.file = FileImporter().import_file(file_obj=django_file, folder=folder) # set date and owner self.file.created_at = self.pub_date self.file.owner = self.created_by.user self.file.save() # return imported file self.save() return self.file
def _make_bound_form(email, file_attached=False, course="", program="", course_mode="", notify=""): """ Builds bound ManageLearnersForm. """ form_data = { ManageLearnersForm.Fields.EMAIL_OR_USERNAME: email, ManageLearnersForm.Fields.COURSE: course, ManageLearnersForm.Fields.PROGRAM: program, ManageLearnersForm.Fields.COURSE_MODE: course_mode, ManageLearnersForm.Fields.NOTIFY: notify, } file_data = {} if file_attached: mock_file = mock.Mock(spec=File) mock_file.name = "some_file.csv" mock_file.read.return_value = "fake file contents" file_data = {ManageLearnersForm.Fields.BULK_UPLOAD: mock_file} customer = EnterpriseCustomerFactory( catalog=99, ) return ManageLearnersForm(form_data, file_data, enterprise_customer=customer)
def test_image_size(self, is_valid_image_size, image_size): """ Test image size in KB's, image_size < 512 KB. Default valid max image size is 512 KB (512 * 1024 bytes). See config `valid_max_image_size` in apps.py. """ file_mock = mock.MagicMock(spec=File, name="FileMock") file_mock.name = "test1.png" file_mock.size = image_size * 1024 # image size in bytes branding_configuration = EnterpriseCustomerBrandingConfiguration( enterprise_customer=factories.EnterpriseCustomerFactory(), logo=file_mock ) if not is_valid_image_size: with self.assertRaises(ValidationError) as validation_error: branding_configuration.full_clean() expected_validation_message = 'The logo image file size must be less than or equal to 512 KB.' self.assertEqual(validation_error.exception.messages[0], expected_validation_message) else: branding_configuration.full_clean() # exception here will fail the test
def test_image_type(self, is_valid_image_extension, image_extension): """ Test image type, currently .png is supported in configuration. see apps.py. """ file_mock = mock.MagicMock(spec=File, name="FileMock") file_mock.name = "test1" + image_extension file_mock.size = 2 * 1024 branding_configuration = EnterpriseCustomerBrandingConfiguration( enterprise_customer=factories.EnterpriseCustomerFactory(), logo=file_mock ) if not is_valid_image_extension: with self.assertRaises(ValidationError): branding_configuration.full_clean() else: branding_configuration.full_clean() # exception here will fail the test
def store_temp_file(fd): """ Given a file-like object and dumps it to the temporary directory. Returns the temporary file object. """ temp_file = tempfile.NamedTemporaryFile( encoding=getattr(fd, 'encoding', None)) source = FileWrapper(fd) for chunk in source.chunks(): temp_file.write(chunk) temp_file.seek(0) return temp_file
def handle(self, *args, **options): logger = logger_setup('mgmt_load_source_file') if (len(args) < 2): raise CommandError('See "./manage.py help load_source_file" for usage') if args[0] not in SourceFile.FILE_TYPES: raise CommandError('"{}" is not a valid file type. Options are: {}'.format(args[0], ', '.join(SourceFile.FILE_TYPES.keys()))) file_type = SourceFile.FILE_TYPES[args[0]] files = args[1:] for f in files: if not os.path.isfile(f): raise CommandError('"{}" is not a file path, aborting'.format(f)) for f in files: if os.path.getsize(f) > 0: source_file, created = SourceFile.objects.get_or_create(file_type=file_type, file_name=os.path.basename(f)) if not created: source_file.data.delete() source_file.data.save(os.path.basename(f), File(open(f, 'rb')), save=True) logger.info('SourceFile id {} ({}, {}) updated'.format(source_file.id, args[0], os.path.basename(f))) else: logger.info('SourceFile id {} IGNORED: File size zero'.format(os.path.basename(f)))
def save(self, name, content): """ Saves new content to the file specified by name. The content should be a proper File object or any python file-like object, ready to be read from the beginning. """ # Get the proper name for the file, as it will actually be saved. if name is None: name = content.name if not hasattr(content, 'chunks'): content = File(content) name = self.get_available_name(name) name = self._save(name, content) # Store filenames with forward slashes, even on Windows return force_text(name.replace('\\', '/')) # These methods are part of the public API, with default implementations.
def create_new_thumb(media_path, instance, owner_slug, max_length, max_width): filename = os.path.basename(media_path) thumb = Image.open(media_path) size = (max_length, max_width) thumb.thumbnail(size, Image.ANTIALIAS) temp_loc = "%s/%s/tmp" %(settings.MEDIA_ROOT, owner_slug) if not os.path.exists(temp_loc): os.makedirs(temp_loc) temp_file_path = os.path.join(temp_loc, filename) if os.path.exists(temp_file_path): temp_path = os.path.join(temp_loc, "%s" %(random.random())) os.makedirs(temp_path) temp_file_path = os.path.join(temp_path, filename) temp_image = open(temp_file_path, "w") thumb.save(temp_image) thumb_data = open(temp_file_path, "r") thumb_file = File(thumb_data) instance.media.save(filename, thumb_file) shutil.rmtree(temp_loc, ignore_errors=True) return True
def transform(self, data): path = self.find_biggest_image(data['props']['srcset']) name = os.path.basename(path) image = Image( caption=data['props'].get('caption', ''), title=data['props']['alt'] or self.page.title ) # save temp file img_temp = NamedTemporaryFile() img_temp.write(self.get_image_data_from_file(path)) img_temp.flush() # save file and image image.file.save(name, File(img_temp), save=True) image.save() return { 'type': 'image', 'value': image.id }
def _serialize(self, value: Any) -> str: if isinstance(value, str): return value elif isinstance(value, int) or isinstance(value, float) \ or isinstance(value, bool) or isinstance(value, decimal.Decimal): return str(value) elif isinstance(value, list) or isinstance(value, dict): return json.dumps(value) elif isinstance(value, datetime) or isinstance(value, date) or isinstance(value, time): return value.isoformat() elif isinstance(value, Model): return value.pk elif isinstance(value, File): return 'file://' + value.name else: for t in self._h.types: if isinstance(value, t.type): return t.serialize(value) raise TypeError('Unable to serialize %s into a setting.' % str(type(value)))
def test_form_save_unchanged_file(organization): val = SimpleUploadedFile("sample_invalid_image.jpg", b"file_content", content_type="image/jpeg") form = SampleForm(obj=organization, attribute_name='settings', data={}, files={ 'test_file': val }) assert form.is_valid() form.save() organization.settings.flush() oldname = organization.settings.get('test_file', as_type=File, binary_file=True).name form = SampleForm(obj=organization, attribute_name='settings', data={'unaffected': 'on'}, files={}) assert form.is_valid() form.save() organization.settings.flush() assert organization.settings.get('test_file', as_type=File, binary_file=True).name == oldname
def test_form_delete_file(organization): val = SimpleUploadedFile("sample_invalid_image.jpg", b"file_content", content_type="image/jpeg") form = SampleForm(obj=organization, attribute_name='settings', data={}, files={ 'test_file': val }) assert form.is_valid() form.save() organization.settings.flush() oldname = organization.settings.get('test_file', as_type=File, binary_file=True).name form = SampleForm(obj=organization, attribute_name='settings', data={ 'test_file-clear': 'on' }) assert form.is_valid() form.save() organization.settings.flush() assert not organization.settings.test_file assert not os.path.exists(oldname)
def test_download_private_no_permission(self): #Create contact for the user profile = create_profile_contact(self.user) contact = profile.contact #Create category cat = ArticleCategory.objects.create(name="CAT") #create a public doc file_ = File(self._get_file()) doc = mommy.make(Document, is_private=True, file=file_, category=cat) #check the url private_url = reverse('coop_cms_download_doc', args=[doc.id]) self.assertEqual(doc.get_download_url(), private_url) #login and download response = self.client.get(doc.get_download_url(), follow=True) self.assertEqual(response.status_code, 200) #self.assertEquals(response['Content-Disposition'], "attachment; filename=unittest1.txt") self.assertEquals(response['Content-Type'], "text/plain")
def test_download_private_no_contact_defined(self): #Create category cat = ArticleCategory.objects.create(name="CAT") cat_perm = CategoryPermission.objects.create(category=cat) #create a public doc file_ = File(self._get_file()) doc = mommy.make(Document, is_private=True, file=file_, category=cat) #check the url private_url = reverse('coop_cms_download_doc', args=[doc.id]) self.assertEqual(doc.get_download_url(), private_url) #login and download response = self.client.get(doc.get_download_url()) self.assertEqual(response.status_code, 403)
def test_import_error(self): """it should create items properly: default categorie""" data_file = File(self._get_file('import_store_fields_no_brand_and_ref_only_1.xls')) #by default fields are name,brand,reference,purchase_price,vat_rate store_import = mommy.make( models.StoreItemImport, data=data_file, fields='pre_tax_price,vat_rate' ) store_import.import_data() self.assertNotEqual(store_import.import_error, '') self.assertEqual(store_import.is_successful, False) self.assertEqual(store_import.last_import_date.date(), date.today()) self.assertEqual(models.StoreItem.objects.count(), 0)
def get_employee_list(self): default_image = File(open('sample_data/default_avatar.png', 'rb')) employees = get_list_or_404(Employee) for employee in employees: employee.avatar.save('default.png', default_image)
def test_upload_endpoint_that_return_success(self): url = reverse('files:file-upload') filename = 'bahtiyar.jpg' client = APIClient() client.login(username='johndoe', password='johndoe') client.credentials(HTTP_CONTENT_DISPOSITION='attachment; filename={}'.format(filename)) file = DjangoFile(open('/data/{}'.format(filename), 'rb')) response = client.post(url, {'name': filename, 'attachment': file}) file.close() self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def handle(self, *args, **options): filename = options['filename'] datatype = options['datatype'] filetype = options['filetype'] coordSystem = options['coordSystem'] coordSystem2 = options['coordSystem2'] # coord = options['coord'] uid = options.get('uid') or slugid.nice().decode('utf-8') name = options.get('name') or op.split(filename)[1] if options['no_upload']: if not op.isfile(op.join(settings.MEDIA_ROOT, filename)): raise CommandError('File does not exist under media root') django_file = filename else: django_file = File(open(filename, 'rb')) # remove the filepath of the filename django_file.name = op.split(django_file.name)[1] tm.Tileset.objects.create( datafile=django_file, filetype=filetype, datatype=datatype, coordSystem=coordSystem, coordSystem2=coordSystem2, owner=None, uuid=uid, name=name)
def create_filer_image(self, folder=None): file_obj = DjangoFile(open(self.filename, 'rb'), name=self.image_name) image = Image.objects.create(owner=self.superuser, original_filename=self.image_name, file=file_obj, folder=folder) return image