我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用django.core.files.uploadedfile.InMemoryUploadedFile()。
def test_post_submission_require_auth_anonymous_user(self): self.user.profile.require_auth = True self.user.profile.save() count = Attachment.objects.count() s = self.surveys[0] media_file = "1335783522563.jpg" path = os.path.join(self.main_directory, 'fixtures', 'transportation', 'instances', s, media_file) with open(path) as f: f = InMemoryUploadedFile(f, 'media_file', media_file, 'image/jpg', os.path.getsize(path), None) submission_path = os.path.join( self.main_directory, 'fixtures', 'transportation', 'instances', s, s + '.xml') with open(submission_path) as sf: data = {'xml_submission_file': sf, 'media_file': f} request = self.factory.post('/submission', data) response = self.view(request) self.assertEqual(response.status_code, 401) response = self.view(request, username=self.user.username) self.assertEqual(response.status_code, 401) self.assertEqual(count, Attachment.objects.count())
def create_media(media): """Download media link""" if is_valid_url(media.data_value): filename = media.data_value.split('/')[-1] data_file = NamedTemporaryFile() content_type = mimetypes.guess_type(filename) with closing(requests.get(media.data_value, stream=True)) as r: for chunk in r.iter_content(chunk_size=CHUNK_SIZE): if chunk: data_file.write(chunk) data_file.seek(os.SEEK_SET, os.SEEK_END) size = os.path.getsize(data_file.name) data_file.seek(os.SEEK_SET) media.data_value = filename media.data_file = InMemoryUploadedFile( data_file, 'data_file', filename, content_type, size, charset=None) return media return None
def test_windows_csv_file_upload(self): count = MetaData.objects.filter(data_type='media').count() media_file = os.path.join( self.this_directory, 'fixtures', 'transportation', 'transportation.csv') f = InMemoryUploadedFile(open(media_file), 'media', 'transportation.csv', 'application/octet-stream', 2625, None) MetaData.media_upload(self.xform, f) media_list = MetaData.objects.filter(data_type='media') new_count = media_list.count() self.assertEqual(count + 1, new_count) media = media_list.get(data_value='transportation.csv') self.assertEqual(media.data_file_type, 'text/csv')
def crop_image(self, data_image, ratio): image = Image.open(data_image) cropped_image = image.crop(ratio) # saving image to memory thumb_io = io.BytesIO() cropped_image.save( thumb_io, data_image.content_type.split('/')[-1].upper() ) # creating new InMemoryUploadedFile() based on the modified file file = InMemoryUploadedFile( thumb_io, 'photo', data_image._get_name(), data_image.content_type, None, None, ) return file
def test_render_with_uploaded_image(self): """ Checks whether the template_with_initial is not affected by the render method if it has been called with just uploaded image """ # set initial template_with_initial value self.widget.template_with_initial = "bar" # create a mock object of just uploaded image image = mock.MagicMock(spec=InMemoryUploadedFile) # patch the parent's render method with mock.patch.object( AdminFileWidget, 'render', return_value='foo' ) as render: # call the method with just uploaded image mock result = widgets.ImageWidget.render(self.widget, 'name', image) # check whether the parent's method has been called # with the same arguments render.assert_called_with('name', image, None) # check whether the method returns the result of the parent's method self.assertEqual(result, 'foo') # check whether the template_with_initial has not been changed self.assertEqual(self.widget.template_with_initial, 'bar')
def perform_image_crop(image_obj, crop_rect=None): img_ext = os.path.splitext(image_obj.name)[1][1:].upper() img_ext = 'JPEG' if img_ext=='JPG' else img_ext if crop_rect is None: return image_obj image = BytesIO(image_obj.read()) base_image = Image.open(image) tmp_img,tmp_file = base_image.crop(crop_rect), BytesIO() tmp_img.save(tmp_file, format=img_ext) tmp_file = ContentFile(tmp_file.getvalue()) return uploadedfile.InMemoryUploadedFile( tmp_file, None, image_obj.name, image_obj.content_type, tmp_file.tell, None )
def sms_media_to_file(file_object, name): if isinstance(file_object, basestring): file_object = io.BytesIO(file_object) def getsize(f): f.seek(0) f.read() s = f.tell() f.seek(0) return s name = name.strip() content_type, charset = mimetypes.guess_type(name) size = getsize(file_object) return InMemoryUploadedFile(file=file_object, name=name, field_name=None, content_type=content_type, charset=charset, size=size)
def save(self, commit=True): instance = super(FormWithRequest, self).save(commit=False) # Fix empty strings to None for field in self.fields.keys(): if (hasattr(instance, field) and (type(getattr(instance, field)) == unicode or type(getattr(instance, field)) == str) and getattr(instance, field) == ''): setattr(instance, field, None) if (hasattr(instance, field) and field in dir(self.Meta.model) and type(self.Meta.model._meta.get_field(field)) == models.models.ImageField): image = self.cleaned_data[field] if image and (isinstance(image, InMemoryUploadedFile) or isinstance(image, TemporaryUploadedFile)): filename = image.name image = shrinkImageFromData(image.read(), filename) setattr(instance, field, image) if commit: instance.save() return instance
def retrieve(self, key, field_name): metadata = self.metadata(key) content = self.backend.get(self.prefix(key) + '_content') if metadata and content: out = BytesIO() out.write(content) upload = InMemoryUploadedFile( file=out, field_name=field_name, name=metadata['name'], content_type=metadata['content_type'], size=metadata['size'], charset=metadata['charset'], ) upload.file.seek(0) else: upload = None return upload
def _createTxtFilestream(self, strData, **kwargs): """ Helper function to create filestream for upload. Parameters : strData : str, test string data Optional Arguments : filename : str, Defaults to 'test.txt' """ filename = kwargs.get('filename', 'test.txt') data = strData.encode('utf-8') filedata = BytesIO(data) filestream = InMemoryUploadedFile( filedata, None, filename, 'text', len(data), None ) return filestream
def _create_gif_filestream_from_base64(self, str_base64, **kwargs): """ Helper function to create filestream for upload. Parameters : strData : str, test string data Optional Arguments : filename : str, Defaults to 'test.txt' """ filename = kwargs.get('filename', 'test.gif') data = base64.b64decode(str_base64) filedata = BytesIO(data) filestream = InMemoryUploadedFile( filedata, None, filename, 'image', len(data), None ) return filestream
def file_complete(self, file_size): """ Return a file object if we're activated. """ if not self.activated: return self.file.seek(0) return InMemoryUploadedFile( file = self.file, field_name = self.field_name, name = self.file_name, content_type = self.content_type, size = file_size, charset = self.charset )
def _internal_service_call(self, share_data) : try: # internal : IMAGE print("?????????? ??? ?? ?? ?? ?? ?????????? ") temp = {} request_type = share_data.get_request_type() decode_text = base64.decodebytes(str.encode(share_data.get_request_data())) temp['test'] = [InMemoryUploadedFile(io.BytesIO(decode_text), None, 'test.jpg', 'image/jpeg', len(decode_text), None)] ml = MultiValueDict(temp) # fp = open("/hoya_src_root/nn00004/1/test1.jpg", 'wb') # fp.write(decode_text) # fp.close() # CNN Prediction if(request_type == "image"): return_val = PredictNetCnn().run('nn00004', None, ml ) name_tag = {"KYJ" : "???", "KSW" : "???", "LTY" : "???", "LSH" : "???", "PJH" : "???", "KSS" : "???", "PSC" : "???"} print("?????????? ??? ?? ?? ?? ?? : " + return_val['test.jpg']['key'][0]) share_data.set_output_data(name_tag[return_val['test.jpg']['key'][0]] + "?? ??? ????") else : share_data.set_output_data("??? ?? ??? ????") return share_data except Exception as e: raise Exception(e)
def upload_major_info(request): if not request.user.is_superuser: return Response(status=status.HTTP_403_FORBIDDEN) upload = request.FILES.get('file', False) if upload: from django.core.files.uploadedfile import InMemoryUploadedFile, TemporaryUploadedFile if type(upload) is InMemoryUploadedFile: temp_file = upload elif type(upload) is TemporaryUploadedFile: temp_file = open(upload.temporary_file_path()) majors = json.load(temp_file) try: for major in majors: Major.objects.create(major_short=major['major_short'], major_full=major['major_full']) except IntegrityError as e: print e.message return Response(status=status.HTTP_201_CREATED) else: return Response(status=status.HTTP_400_BAD_REQUEST)
def upload_professor_info(request): if not request.user.is_superuser: return Response(status=status.HTTP_403_FORBIDDEN) upload = request.FILES.get('file', False) if upload: from django.core.files.uploadedfile import InMemoryUploadedFile, TemporaryUploadedFile if type(upload) is InMemoryUploadedFile: temp_file = upload elif type(upload) is TemporaryUploadedFile: temp_file = open(upload.temporary_file_path()) professors = json.load(temp_file) try: for professor in professors: name = professor['name'].upper().split() email = professor.get('email', '') office = professor.get('address', '') Professor.objects.create(first_name=name[0], last_name=name[1], email=email, office=office) except IntegrityError as e: print e.message return Response(status=status.HTTP_201_CREATED) else: return Response(status=status.HTTP_400_BAD_REQUEST)
def strip_img_metadata(in_memory_img): if not in_memory_img: return None content_type = "" if type(in_memory_img) == InMemoryUploadedFile: content_type = in_memory_img.content_type else: # XXX some testcases seem to create files with wrong file format. In case of erroneous magic # bytes we do not strip any data. # TODO is there something we can do about that? pass # jpeg if content_type == "image/jpeg" or content_type == "image/jpg": img = Image.open(in_memory_img) img_io_bytes = io.BytesIO() img.save(img_io_bytes, format='JPEG') new_img = InMemoryUploadedFile(img_io_bytes, None, in_memory_img.name, 'image/jpeg', img_io_bytes.getbuffer().nbytes, None) new_img.seek(0) img.close() return new_img # TODO remove additional metadata for other formats return in_memory_img
def image_create(request, **kwargs): """Create image. :param kwargs: * copy_from: URL from which Glance server should immediately copy the data and store it in its configured image store. * data: Form data posted from client. * location: URL where the data for this image already resides. In the case of 'copy_from' and 'location', the Glance server will give us a immediate response from create and handle the data asynchronously. In the case of 'data' the process of uploading the data may take some time and is handed off to a separate thread. """ data = kwargs.pop('data', None) image = glanceclient(request).images.create(**kwargs) if data: if isinstance(data, TemporaryUploadedFile): # Hack to fool Django, so we can keep file open in the new thread. data.file.close_called = True if isinstance(data, InMemoryUploadedFile): # Clone a new file for InMemeoryUploadedFile. # Because the old one will be closed by Django. data = SimpleUploadedFile(data.name, data.read(), data.content_type) thread.start_new_thread(image_update, (request, image.id), {'data': data, 'purge_props': False}) return image
def test_save_no_location(self, mock_blob, mock_bucket): s = Storage() content = InMemoryUploadedFile(six.StringIO('1'), '', 'test.jpg', 'text/plain', 1, 'utf8') with content: s._save('', content) mock_blob.assert_called_once_with('test.jpg', mock_bucket())
def test_save_with_location(self, mock_blob, mock_bucket): s = Storage(location='images') content = InMemoryUploadedFile(six.StringIO('1'), '', 'test.jpg', 'text/plain', 1, 'utf8') with content: s._save('', content) mock_blob.assert_called_once_with('images/test.jpg', mock_bucket())
def get_image_in_memory_data(): """ Creates the InMemoryUploadedFile object using thedata from io to save it into the ImageField of the database """ io = get_image_data() # get a red rectangle 200x200px # create the InMemoryUploadedFile object with the 'foo.jpg' file image_file = InMemoryUploadedFile(io, None, 'foo.jpg', 'jpeg', sys.getsizeof(io), None) image_file.seek(0) # seek to the beginning return image_file
def create_in_memory_image(image, name, size): """ Resizes the image and saves it as InMemoryUploadedFile object Returns the InMemoryUploadedFile object with the image data """ output = io.BytesIO() # create an io object # resize the image and save it to the io object image_resize(image, output, size) # get MIME type of the image mime = magic.from_buffer(output.getvalue(), mime=True) # create InMemoryUploadedFile using data from the io return uploadedfile.InMemoryUploadedFile(output, 'ImageField', name, mime, sys.getsizeof(output), None)
def test_post_submission_anonymous(self): s = self.surveys[0] media_file = "1335783522563.jpg" path = os.path.join(self.main_directory, 'fixtures', 'transportation', 'instances', s, media_file) with open(path) as f: f = InMemoryUploadedFile(f, 'media_file', media_file, 'image/jpg', os.path.getsize(path), None) submission_path = os.path.join( self.main_directory, 'fixtures', 'transportation', 'instances', s, s + '.xml') with open(submission_path) as sf: data = {'xml_submission_file': sf, 'media_file': f} request = self.factory.post( '/%s/submission' % self.user.username, data) request.user = AnonymousUser() response = self.view(request, username=self.user.username) self.assertContains(response, 'Successful submission', status_code=201) self.assertTrue(response.has_header('X-OpenRosa-Version')) self.assertTrue( response.has_header('X-OpenRosa-Accept-Content-Length')) self.assertTrue(response.has_header('Date')) self.assertEqual(response['Content-Type'], 'text/xml; charset=utf-8') self.assertEqual(response['Location'], 'http://testserver/%s/submission' % self.user.username)
def test_post_submission_authenticated(self): s = self.surveys[0] media_file = "1335783522563.jpg" path = os.path.join(self.main_directory, 'fixtures', 'transportation', 'instances', s, media_file) with open(path) as f: f = InMemoryUploadedFile(f, 'media_file', media_file, 'image/jpg', os.path.getsize(path), None) submission_path = os.path.join( self.main_directory, 'fixtures', 'transportation', 'instances', s, s + '.xml') with open(submission_path) as sf: data = {'xml_submission_file': sf, 'media_file': f} request = self.factory.post('/submission', data) response = self.view(request) self.assertEqual(response.status_code, 401) # rewind the file and redo the request since they were # consummed sf.seek(0) request = self.factory.post('/submission', data) auth = DigestAuth('bob', 'bobbob') request.META.update(auth(request.META, response)) response = self.view(request, username=self.user.username) self.assertContains(response, 'Successful submission', status_code=201) self.assertTrue(response.has_header('X-OpenRosa-Version')) self.assertTrue( response.has_header('X-OpenRosa-Accept-Content-Length')) self.assertTrue(response.has_header('Date')) self.assertEqual(response['Content-Type'], 'text/xml; charset=utf-8') self.assertEqual(response['Location'], 'http://testserver/submission')
def test_windows_csv_file_upload_to_metadata(self): data_value = 'transportation.csv' path = os.path.join(self.fixture_dir, data_value) with open(path) as f: f = InMemoryUploadedFile( f, 'media', data_value, 'text/csv', 2625, None) data = { 'data_value': data_value, 'data_file': f, 'data_type': 'media', 'xform': self.xform.pk } self._post_form_metadata(data) self.assertEqual(self.metadata.data_file_type, 'text/csv')
def django_file(path, field_name, content_type): # adapted from here: http://groups.google.com/group/django-users/browse_th\ # read/thread/834f988876ff3c45/ f = open(path) return InMemoryUploadedFile( file=f, field_name=field_name, name=f.name, content_type=content_type, size=os.path.getsize(path), charset=None )
def django_file(file_obj, field_name, content_type): return InMemoryUploadedFile( file=file_obj, field_name=field_name, name=file_obj.name, content_type=content_type, size=file_obj.size, charset=None )
def capture_screenshot(project): """Captures screenshot of Project website and saves it as project photo. If Project website is not available, then project source is used. """ screenshot_url = project.source_link if not project.website_link \ else project.website_link driver = webdriver.PhantomJS() driver.set_window_size(1024, 420) # Get URL driver.get(screenshot_url) screenshot = driver.get_screenshot_as_png() img = Image.open(BytesIO(screenshot)) # Crop image box = (0, 0, 1024, 420) img = img.crop(box) # Save image in a buffer and create a content file buffer_ = BytesIO() img.save(buffer_, 'PNG') content_file = ContentFile(buffer_.getvalue()) # Save content file in memory and pass it to Project photo # File is automatically renamed by Django in case of conflicts image_file = InMemoryUploadedFile(content_file, None, 'project.png', 'image/png', content_file.tell, None) Photo.objects.update_or_create(project=project, defaults={'image': image_file})
def notebook_maker(): def maker(extension='ipynb'): return InMemoryUploadedFile( file=io.BytesIO(b'{}'), field_name='notebook', name='test-notebook.%s' % extension, content_type='text/plain', size=2, charset='utf8', ) return maker
def create_thumbnail(file_path): thumbnail_filename = utils.get_thumb_filename(file_path) thumbnail_format = utils.get_image_format(os.path.splitext(file_path)[1]) image = default_storage.open(file_path) image = Image.open(image) file_format = image.format # Convert to RGB if necessary # Thanks to Limodou on DjangoSnippets.org # http://www.djangosnippets.org/snippets/20/ if image.mode not in ('L', 'RGB'): image = image.convert('RGB') # scale and crop to thumbnail imagefit = ImageOps.fit(image, THUMBNAIL_SIZE, Image.ANTIALIAS) thumbnail_io = BytesIO() imagefit.save(thumbnail_io, format=file_format) thumbnail = InMemoryUploadedFile( thumbnail_io, None, thumbnail_filename, thumbnail_format, len(thumbnail_io.getvalue()), None) thumbnail.seek(0) return default_storage.save(thumbnail_filename, thumbnail)
def clean(self): super().clean() # It will not allow any uppercase letters in 'email'. email_name, domain_part = self.email.strip().split('@') self.email = '@'.join([email_name.lower(), domain_part.lower()]) # self.email = self.__class__.objects.normalize_email(self.email) if self.avatar is not None and self.avatar.name != self.USERPROFILE_IMAGE: from PIL import Image image = Image.open(self.avatar) image = image.resize((150,150), Image.ANTIALIAS) buffer = BytesIO() image = image.convert('RGB') image.save(fp=buffer, format='JPEG') image_content = ContentFile(buffer.getvalue()) resized_image = InMemoryUploadedFile( # file(ContentFile) image_content, # field_name(idk) None, # name str(self.avatar), # content_type 'image/jpeg', # size image_content.tell, # charset(idk) None ) self.avatar = resized_image
def generate_qrcode(self): url = 'http://' url += str(Site.objects.get_current()) url += '/boites/redirect/' url += str(self.api_key) img = qrcode.make(url) buffer = BytesIO() img.save(buffer) filename = 'qrcode-%s.png' % str(self.api_key) filebuffer = InMemoryUploadedFile(buffer, None, filename, 'image/png', len(buffer.getvalue()), None) self.qrcode.save(filename, filebuffer)
def replace_first_line(file, replacement): # suppress a type error which was confusing if type(file) is not InMemoryUploadedFile: with open(file, 'r', encoding='iso-8859-1') as f: data = f.readlines() if "!DOCTYPE" not in data[0]: data[0] = replacement with open(file, 'w') as f: f.writelines(data)
def generate_resized_image(source, size): img_l = Image.open(source) img_l.thumbnail(size, Image.ANTIALIAS) data_img = StringIO.StringIO() img_l.save(data_img, 'JPEG') img_l.close() img_temp_file = InMemoryUploadedFile(data_img, None, str(size[0]) + '.jpg', 'image/jpeg', data_img.len, None) return img_temp_file
def save(self, *args, **kwargs): """overrides the save method for the model """ # fetches the original image from its url and loads it to memory image_request = requests.get(self.original_image) image_bytes = io.BytesIO(image_request.content) image = Image.open(image_bytes) filename = os.path.basename(self.original_image) # call the apply_effect method on the in-memory original image image_with_effect = apply_effects(image, self.effects) # creat an empty in-memory bytes section to hold the edited image edited_image = io.BytesIO() image_with_effect.save(edited_image, format=image.format) # initiate an upload from the in-memory edited image edited_file = InMemoryUploadedFile( edited_image, None, filename, 'image/jpeg', edited_image.tell, None ) self.phedited_image.save( filename, edited_file, save=False, ) super(PheditedImage, self).save(*args, **kwargs)
def test_post_submission_anonymous(self): s = self.surveys[0] media_file = "1335783522563.jpg" path = os.path.join(self.main_directory, 'fixtures', 'transportation', 'instances', s, media_file) with open(path) as f: f = InMemoryUploadedFile(f, 'media_file', media_file, 'image/jpg', os.path.getsize(path), None) submission_path = os.path.join( self.main_directory, 'fixtures', 'transportation', 'instances', s, s + '.xml') with open(submission_path) as sf: data = {'xml_submission_file': sf, 'media_file': f} request = self.factory.post( '%s/submission' % self.user.username, data) request.user = AnonymousUser() response = self.view(request, username=self.user.username) self.assertContains(response, 'Successful submission', status_code=201) self.assertTrue(response.has_header('X-OpenRosa-Version')) self.assertTrue( response.has_header('X-OpenRosa-Accept-Content-Length')) self.assertTrue(response.has_header('Date')) self.assertEqual(response['Content-Type'], 'text/xml; charset=utf-8') self.assertEqual(response['Location'], 'http://testserver/%s/submission' % self.user.username)
def test_post_submission_authenticated(self): s = self.surveys[0] media_file = "1335783522563.jpg" path = os.path.join(self.main_directory, 'fixtures', 'transportation', 'instances', s, media_file) with open(path) as f: f = InMemoryUploadedFile(f, 'media_file', media_file, 'image/jpg', os.path.getsize(path), None) submission_path = os.path.join( self.main_directory, 'fixtures', 'transportation', 'instances', s, s + '.xml') with open(submission_path) as sf: data = {'xml_submission_file': sf, 'media_file': f} request = self.factory.post('/submission', data) response = self.view(request) self.assertEqual(response.status_code, 401) auth = DigestAuth('bob', 'bobbob') request.META.update(auth(request.META, response)) response = self.view(request, username=self.user.username) self.assertContains(response, 'Successful submission', status_code=201) self.assertTrue(response.has_header('X-OpenRosa-Version')) self.assertTrue( response.has_header('X-OpenRosa-Accept-Content-Length')) self.assertTrue(response.has_header('Date')) self.assertEqual(response['Content-Type'], 'text/xml; charset=utf-8') self.assertEqual(response['Location'], 'http://testserver/submission')
def test_windows_csv_file_upload_to_metadata(self): data_value = 'transportation.csv' path = os.path.join(self.fixture_dir, data_value) with open(path) as f: f = InMemoryUploadedFile( f, 'media', data_value, 'application/octet-stream', 2625, None) data = { 'data_value': data_value, 'data_file': f, 'data_type': 'media', 'xform': self.xform.pk } self._post_form_metadata(data) self.assertEqual(self.metadata.data_file_type, 'text/csv')
def set_object_permissions(sender, instance=None, created=False, **kwargs): # seems the super is not called, have to get xform from here xform = XForm.objects.get(pk=instance.pk) if created: from formshare.libs.permissions import OwnerRole OwnerRole.add(instance.user, xform) if instance.created_by and instance.user != instance.created_by: OwnerRole.add(instance.created_by, xform) from formshare.libs.utils.project_utils import set_project_perms_to_xform set_project_perms_to_xform(xform, instance.project) if hasattr(instance, 'has_external_choices') \ and instance.has_external_choices: instance.xls.seek(0) f = sheet_to_csv(instance.xls.read(), 'external_choices') f.seek(0, os.SEEK_END) size = f.tell() f.seek(0) from formshare.apps.main.models.meta_data import MetaData data_file = InMemoryUploadedFile( file=f, field_name='data_file', name='itemsets.csv', content_type='text/csv', size=size, charset=None ) MetaData.media_upload(xform, data_file)
def django_file(path, field_name, content_type): # adapted from here: # http://groups.google.com/group/django-users/browse_thread/thread/ # 834f988876ff3c45/ f = open(path) return InMemoryUploadedFile( file=f, field_name=field_name, name=f.name, content_type=content_type, size=os.path.getsize(path), charset=None )