我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.conf.settings.MEDIA_ROOT。
def tabla_encabezado(self,valorizado): sp = ParagraphStyle('parrafos', alignment=TA_CENTER, fontSize=14, fontName="Times-Roman") try: archivo_imagen = os.path.join(settings.MEDIA_ROOT, str(EMPRESA.logo)) imagen = Image(archivo_imagen, width=90, height=50, hAlign='LEFT') except: imagen = Paragraph(u"LOGO", sp) if valorizado: titulo = Paragraph(u"REGISTRO DEL INVENTARIO PERMANENTE VALORIZADO", sp) else: titulo = Paragraph(u"REGISTRO DEL INVENTARIO PERMANENTE EN UNIDADES FÍSICAS", sp) encabezado = [[imagen,titulo]] tabla_encabezado = Table(encabezado, colWidths=[2 * cm, 23 * cm]) return tabla_encabezado
def download_annotated(request, individual_id): individual = get_object_or_404(Individual, pk=individual_id) filepath = os.path.dirname(str(individual.vcf_file.name)) filename = os.path.basename(str(individual.vcf_file.name)) # path = settings.MEDIA_ROOT # if filename.endswith('vcf.zip'): # basename = filename.split('.vcf.zip')[0] # else: basename = filename.split('.vcf')[0] fullpath = '%s/annotation.final.vcf.zip' % (filepath) vcffile = open(fullpath, 'rb') response = HttpResponse(vcffile, content_type='application/x-zip-compressed') # # response['Content-Encoding'] = 'gzip' response['Content-Disposition'] = 'attachment; filename=%s.annotated.mendelmd.vcf.zip' % basename response['Content-Length'] = os.path.getsize(fullpath) return response
def __init__(self, location=None, base_url=None, file_permissions_mode=None, directory_permissions_mode=None): if location is None: location = settings.MEDIA_ROOT self.base_location = location self.location = abspathu(self.base_location) if base_url is None: base_url = settings.MEDIA_URL elif not base_url.endswith('/'): base_url += '/' self.base_url = base_url self.file_permissions_mode = ( file_permissions_mode if file_permissions_mode is not None else settings.FILE_UPLOAD_PERMISSIONS ) self.directory_permissions_mode = ( directory_permissions_mode if directory_permissions_mode is not None else settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS )
def check_settings(base_url=None): """ Checks if the staticfiles settings have sane values. """ if base_url is None: base_url = settings.STATIC_URL if not base_url: raise ImproperlyConfigured( "You're using the staticfiles app " "without having set the required STATIC_URL setting.") if settings.MEDIA_URL == base_url: raise ImproperlyConfigured("The MEDIA_URL and STATIC_URL " "settings must have different values") if ((settings.MEDIA_ROOT and settings.STATIC_ROOT) and (settings.MEDIA_ROOT == settings.STATIC_ROOT)): raise ImproperlyConfigured("The MEDIA_ROOT and STATIC_ROOT " "settings must have different values")
def static(prefix, view=serve, **kwargs): """ Helper function to return a URL pattern for serving files in debug mode. from django.conf import settings from django.conf.urls.static import static urlpatterns = [ # ... the rest of your URLconf goes here ... ] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) """ # No-op if not in debug mode or an non-local prefix if not settings.DEBUG or (prefix and '://' in prefix): return [] elif not prefix: raise ImproperlyConfigured("Empty static prefix not permitted") return [ url(r'^%s(?P<path>.*)$' % re.escape(prefix.lstrip('/')), view, kwargs=kwargs), ]
def api_song_meta(request): def convert_for_no_safari(meta): if meta.codec == 'alac': if settings.ALAC_CONVERTIBLE: convert_aac.delay(settings.MEDIA_ROOT + meta.filepath, meta.rawhash) meta.fileurl = meta.cacheurl return meta if request.method == 'GET': data = request.query_params try: meta = MusicLibrary.objects.get(rawhash=data['rawhash']) except MusicLibrary.DoesNotExist: return JsonResponse({}, safe=True) if not is_ua_safari(request): meta = convert_for_no_safari(meta) serializer = MusicMetaSerializer(meta) return JsonResponse(serializer.data)
def _create(self, fixture_file_path, less_verbose=0): try: self.write_info('Creating fixture %s' % fixture_file_path, 1+less_verbose) fixture_file_path = re.sub(r'\.zip$', '', fixture_file_path) # we strip away .zip if given tmp_dir = tempfile.mkdtemp() # copy media root shutil.copytree(self._media_root, join(tmp_dir, 'MEDIA_ROOT')) # database dump with open(join(tmp_dir, 'db.sql'), 'w') as fp: return_code = subprocess.call(['pg_dump', '--clean', '--no-owner', self._database_name], stdout=fp) if return_code != 0: raise CommandError('pg_dump failed with exit code {}'.format(return_code)) # creating the fixture archive archive_name = shutil.make_archive(fixture_file_path, 'zip', root_dir=tmp_dir) self.write_debug(subprocess.check_output(['unzip', '-l', archive_name])) except: self.write_debug('Temporary directory %s kept due to exception.' % tmp_dir) raise else: self.write_info('... fixture created', 1+less_verbose) shutil.rmtree(tmp_dir)
def write_to_package_job(control, path, callback_version_id): # copy to temporary """ This job will be called when any field in .deb file control part has been edited. :param control: New Control Dict :type control: dict :param path: Original Package Path :type path: str :param callback_version_id: Callback Version ID, for callback query :type callback_version_id: int """ abs_path = os.path.join(settings.MEDIA_ROOT, path) temp_path = os.path.join(settings.TEMP_ROOT, str(uuid.uuid1()) + '.deb') shutil.copyfile(abs_path, temp_path) # read new package temp_package = DebianPackage(temp_path) temp_package.control = control # save new package temp_package.save() t_version = Version.objects.get(id=callback_version_id) t_version.write_callback(temp_package.path)
def get(self, request, *args, **kwargs): path = kwargs.get("path") # No path? You're boned. Move along. if not path: raise Http404 if self._is_url(path): content = requests.get(path, stream=True).raw.read() else: # Normalise the path to strip out naughty attempts path = os.path.normpath(path).replace( settings.MEDIA_URL, settings.MEDIA_ROOT, 1) # Evil path request! if not path.startswith(settings.MEDIA_ROOT): raise Http404 # The file requested doesn't exist locally. A legit 404 if not os.path.exists(path): raise Http404 with open(path, "rb") as f: content = f.read() content = Cryptographer.decrypted(content) return HttpResponse( content, content_type=magic.Magic(mime=True).from_buffer(content))
def get_available_name(self, name): """Returns a filename that's free on the target storage system, and available for new content to be written to. Found at http://djangosnippets.org/snippets/976/ This file storage solves overwrite on upload problem. Another proposed solution was to override the save method on the model like so (from https://code.djangoproject.com/ticket/11663): def save(self, *args, **kwargs): try: this = MyModelName.objects.get(id=self.id) if this.MyImageFieldName != self.MyImageFieldName: this.MyImageFieldName.delete() except: pass super(MyModelName, self).save(*args, **kwargs) """ # If the filename already exists, remove it as if it was a true file system if self.exists(name): os.remove(os.path.join(settings.MEDIA_ROOT, name)) return name
def create_by_values_and_gid(values: DataDict, gid: str) -> 'Archive': archive = Archive(**values) archive.simple_save() if gid: gallery, _ = Gallery.objects.get_or_create(gid=gid) archive.gallery = gallery archive.tags.set(gallery.tags.all()) archive.zipped = os.path.join( "galleries/archives/{id}/{file}".format( id=archive.id, file=replace_illegal_name(archive.title) + '.zip' ), ) os.makedirs( os.path.join( settings.MEDIA_ROOT, "galleries/archives/{id}".format(id=archive.id)), exist_ok=True) archive.save() return archive
def copy_img(self, img_path: str) -> None: tf2 = NamedTemporaryFile() shutil.copy(img_path, tf2.name) self.image.save(os.path.splitext(img_path)[1], File(tf2), save=False) tf2.close() im = PImage.open(self.image.path) if im.mode != 'RGB': im = im.convert('RGB') # large thumbnail im.thumbnail((200, 290), PImage.ANTIALIAS) thumb_relative_path = upload_announce_thumb_handler(self, os.path.splitext(img_path)[1]) thumb_fn = pjoin(settings.MEDIA_ROOT, thumb_relative_path) os.makedirs(os.path.dirname(thumb_fn), exist_ok=True) im.save(thumb_fn, "JPEG") self.thumbnail.name = thumb_relative_path self.save()
def regen_tn(self) -> None: if not self.image: return im = PImage.open(self.image.path) if im.mode != 'RGB': im = im.convert('RGB') # large thumbnail im.thumbnail((200, 290), PImage.ANTIALIAS) thumb_fn = upload_announce_thumb_handler(self, os.path.splitext(self.image.name)[1]) os.makedirs(os.path.dirname(thumb_fn), exist_ok=True) im.save(pjoin(settings.MEDIA_ROOT, thumb_fn), "JPEG") self.thumbnail.name = thumb_fn self.save()
def test_build_invalid_zip(self): attachment_name = 'build.json' attachment_path = os.path.join(settings.MEDIA_ROOT, 'test_attachment') copyfile(os.path.join(FILES_PATH, 'invalid_archive.zip'), attachment_path) attachment = Attachment( name=attachment_name, buildable=True ) attachment.file = os.path.join('..', attachment_path) attachment.save() kwargs = { 'key': TRACKER_ATTACHMENT_EXECUTED, 'campaign_id': 1, 'target_id': 1, 'value': 'tracker: not opened', } tracker = Tracker.objects.create(**kwargs) with self.assertRaises(SuspiciousOperation): attachment.build(tracker)
def test_build_static(self): attachment_name = 'b64.png' attachment_path = os.path.join(settings.MEDIA_ROOT, 'test_attachment') copyfile(os.path.join(FILES_PATH, 'image.png'), attachment_path) attachment = Attachment( name=attachment_name, buildable=False ) attachment.file = os.path.join('..', attachment_path) attachment.save() kwargs = { 'key': TRACKER_ATTACHMENT_EXECUTED, 'campaign_id': 1, 'target_id': 1, 'value': 'tracker: not opened', } tracker = Tracker.objects.create(**kwargs) res = attachment.build(tracker) self.assertEqual(res, attachment.file) # Clean media dir after test os.remove(attachment_path)
def get_project(**kwargs): project = OrderedDict() project["current_dir"] = os.path.realpath(os.curdir) project["tempdir"] = tempfile.gettempdir() if config.MEDIA_ROOT: project["MEDIA_ROOT"] = OrderedDict([("path", settings.MEDIA_ROOT), ("disk", get_device_info(settings.MEDIA_ROOT))]) if config.STATIC_ROOT: project["STATIC_ROOT"] = OrderedDict([("path", settings.STATIC_ROOT), ("disk", get_device_info(settings.STATIC_ROOT))]) if config.CACHES: project["CACHES"] = get_caches_info() if config.installed_apps: project["installed_apps"] = get_installed_apps() if config.mail: project["mail"] = get_mail(**kwargs) return project
def test_delete_file(make_dirs): # noqa file = create_file() with open(os.path.join(settings.MEDIA_ROOT, 's3', 'uploads', 'text.txt'), 'wb') as dest_file: dest_file.write(open(file.name, 'rb').read()) field = S3FileField(storage=FakeS3Storage()) s3_file = S3File('/media/s3/uploads/text.txt', field) s3_file.file = dest_file s3_file.delete() assert not hasattr(s3_file, '_file') assert not os.path.isfile( os.path.join(settings.MEDIA_ROOT, 's3', 'uploads', 'text.txt')) # ############################################################################# # # S3FileField # # #############################################################################
def test_pre_save_replace_file(): file = create_file() with open(os.path.join(settings.MEDIA_ROOT, 's3/uploads/text.txt'), 'wb') as dest_file: dest_file.write(open(file.name, 'rb').read()) with open(os.path.join(settings.MEDIA_ROOT, 's3/uploads/text2.txt'), 'wb') as dest_file: dest_file.write(open(file.name, 'rb').read()) model_instance = FileModel(s3_file='/media/s3/uploads/text.txt') model_instance.save() model_instance.refresh_from_db() field = model_instance.s3_file.field field.storage = FakeS3Storage() model_instance.s3_file = '/media/s3/uploads/text2.txt' url = field.pre_save(model_instance, False) assert url == '/media/s3/uploads/text2.txt' assert not os.path.isfile(os.path.join(settings.MEDIA_ROOT, 's3/uploads/text.txt'))
def test_post_large_file(make_dirs, monkeypatch, settings): # noqa monkeypatch.setattr(views, 'default_storage', FakeS3Storage()) file = create_file() upload = SimpleUploadedFile('text.txt', open(file.name, 'rb').read()) upload.size = settings.AWS['MAX_FILE_SIZE'] + 1 request = HttpRequest() setattr(request, 'method', 'POST') setattr(request, 'FILES', {'file': upload}) setattr(request, 'POST', {'key': 'subdir/text.txt'}) response = views.fake_s3_upload(request) assert response.status_code == 400 assert response.content.decode('utf-8') == errors.EXCEED_MAX_SIZE.format( max_size=settings.AWS['MAX_FILE_SIZE'], proposed_size=settings.AWS['MAX_FILE_SIZE'] + 1) assert not os.path.isfile( os.path.join(settings.MEDIA_ROOT, 's3/uploads/subdir', 'text.txt'))
def _open(self, name, mode='rb'): s3 = self.get_boto_ressource() f = s3.Object(self.bucket_name, name).get() write_path = os.path.join(settings.MEDIA_ROOT, 's3/downloads', name.split('/')[-1]) with open(write_path, 'wb') as w: to_read = True body = f['Body'] while to_read: chunk = body.read(1024) if chunk: w.write(chunk) else: to_read = False return w.name
def save(self, commit=True): instance = super(InspirationQuoteForm, self).save(commit=True) if self.cleaned_data["delete_picture"] and instance.picture: instance.picture.delete() if self.cleaned_data["picture_path"]: tmp_path = self.cleaned_data["picture_path"] abs_tmp_path = os.path.join(settings.MEDIA_ROOT, tmp_path) filename = InspirationQuote._meta.get_field("picture").upload_to(instance, tmp_path) instance.picture.save(filename, File(open(abs_tmp_path, "rb")), False) os.remove(abs_tmp_path) instance.save() return instance
def download_cv_pdf(request, cv_id): cv = get_object_or_404(CV, pk=cv_id) response = HttpResponse(content_type="application/pdf") response["Content-Disposition"] = "attachment; filename=%s_%s.pdf" % (cv.first_name, cv.last_name) html = render_to_string("cv/cv_pdf.html", { "cv": cv, "MEDIA_ROOT": settings.MEDIA_ROOT, "STATIC_ROOT": settings.STATIC_ROOT, }) pdf = pisa.pisaDocument( StringIO(html.encode("UTF-8")), response, encoding="UTF-8", ) return response
def test_validate(self): config = Config() with self.assertRaises(ValueError): # Invalid limit_log config.limit_log = 0 config.validate() config = Config() with self.assertRaises(ValueError): # Invalid ranking_limit_disp config.ranking_limit_disp = 0 config.validate() from django.conf import settings config = Config() with self.assertRaises(ValueError): # Invalid MEDIA_ROOT config.media_directory = 'visitor' settings.MEDIA_ROOT = None config.validate()
def export(self, request, queryset): """ Download selected photos as ZIP """ zip_subdir = 'photos' zip_filename = '{}.zip'.format(zip_subdir) zip_file = os.path.join(settings.MEDIA_ROOT, PHOTOLOGUE_DIR, zip_filename) try: os.remove(zip_file) except OSError: pass with zipfile.ZipFile(zip_file, "a") as zf: for photo in queryset.all(): path = photo.image.path if os.path.isfile(path): fdir, fname = os.path.split(path) zip_path = os.path.join(zip_subdir, fname) zf.write(path, zip_path) link = 'Photos download link: <a href="{0}?v={1}">{0}</a>'.format( urllib.parse.urljoin(settings.MEDIA_URL, PHOTOLOGUE_DIR + '/' + zip_filename), time()) messages.add_message(request, messages.INFO, mark_safe(link))
def render_pdf(self): if self.sponsoring.billingReferenceOptOut: billRef = str(self.sponsoring.id) else: billRef = self.sponsoring.billingReference context = { "POSTAL_ADDRESS" : self.sponsoring.getBillingAddress(), "INVOICE_NUMBER" : self.invoiceNumber, "INVOICE_REF" : billRef, "DUE_DATE" : self.dueDate.strftime("%d.%m.%Y"), "PACKAGE_NAME" : self.sponsoring.package.name, "PACKAGE_DESCRIPTION" : "\n".join(self.sponsoring.getPacketDescription()), "VAT" : str(settings.INVOICE_VAT), "PRICE_NET" : str(self.sponsoring.package.price.quantize(Decimal("0.01"))), "PRICE_GROSS" : str(self.sponsoring.package.getPriceGross()), "VAT_TOTAL" : str(self.sponsoring.package.getVATAmount()), } temp = odtemplate.ODTTemplate(self.template.template.path) temp.render(context) if not os.path.exists(settings.MEDIA_ROOT + "invoice_pdfs"): os.mkdir(settings.MEDIA_ROOT + "invoice_pdfs") pdfpath = settings.MEDIA_ROOT + "invoice_pdfs/" + self.getInvoiceFilename() temp.savePDF(pdfpath) self.pdf.name = "invoice_pdfs/" + self.getInvoiceFilename() self.save()
def get_cloud_rate(scene_name): """Read the MTL file and return the cloud_rate of the scene.""" sat = 'L%s' % scene_name[2] mtl_path = join(settings.MEDIA_ROOT, sat, scene_name, scene_name + '_MTL.txt') if isfile(mtl_path): with open(mtl_path, 'r') as f: lines = f.readlines() cloud_rate = [float(line.split(' = ')[-1]) for line in lines if 'CLOUD_COVER' in line][0] return cloud_rate else: url_code = get_metadata_code(scene_name) metadata = PyQuery( 'http://earthexplorer.usgs.gov/metadata/%s/%s/' % (url_code, scene_name) ) metadata = metadata.text()[metadata.text().find('Cloud Cover '):] return float(metadata.split(' ')[2])
def test_creation(self): self.assertEqual(self.scene.__str__(), 'L8 001-001 01/01/15') self.assertEqual(self.scene.day(), '001') self.assertEqual(self.scene.dir(), join(settings.MEDIA_ROOT, 'L8/LC80010012015001LGN00')) self.scene.status = 'downloaded' self.scene.save() self.assertEqual(self.scene.status, 'downloaded') Scene.objects.create( path='001', row='001', sat='L8', date=date(2015, 1, 17), name='LC80010012015017LGN00', status='downloading' ) self.assertEqual(Scene.objects.count(), 4)
def setUp(self): self.today_number = three_digit(date.today().timetuple().tm_yday) self.sd = ScheduledDownload.objects.create(path='220', row='066') self.sd2 = ScheduledDownload.objects.create(path='999', row='002') self.scene = Scene.objects.create( path='220', row='066', sat='L8', date=date(2015, 1, 1), name='LC82200662015001LGN00', cloud_rate=20.3, status='downloading' ) if not exists(settings.MEDIA_ROOT): makedirs(settings.MEDIA_ROOT)
def form_valid(self, form): data = form.cleaned_data docfile = data['archivo'] form.save() csv_filepathname = os.path.join(settings.MEDIA_ROOT, 'archivos', str(docfile)) dataReader = csv.reader(open(csv_filepathname), delimiter=',', quotechar='"') for fila in dataReader: dni = fila[0] if dni != "": appaterno = fila[1].upper() apmaterno = fila[2].upper() nombres = fila[3].upper() try: productor, creado = Productor.objects.get_or_create(dni=dni, defaults={'apellido_paterno': appaterno, 'apellido_materno': apmaterno, 'nombres': nombres}) except: pass return HttpResponseRedirect(reverse('administracion:maestro_productores'))
def tabla_encabezado(self, styles): sp = ParagraphStyle('parrafos', alignment=TA_CENTER, fontSize=14, fontName="Times-Roman") requerimiento = self.requerimiento try: archivo_imagen = os.path.join(settings.MEDIA_ROOT, str(EMPRESA.logo)) imagen = Image(archivo_imagen, width=90, height=50, hAlign='LEFT') except: print u"Ingresa acá " imagen = Paragraph(u"LOGO", sp) nro = Paragraph(u"REQUERIMIENTO DE BIENES Y SERVICIOS<br/>N°" + requerimiento.codigo, sp) encabezado = [[imagen, nro, '']] tabla_encabezado = Table(encabezado, colWidths=[4 * cm, 11 * cm, 4 * cm]) tabla_encabezado.setStyle(TableStyle( [ ('ALIGN', (0, 0), (1, 0), 'CENTER'), ('VALIGN', (0, 0), (1, 0), 'CENTER'), ] )) return tabla_encabezado
def tabla_encabezado_consolidado(self, grupos): sp = ParagraphStyle('parrafos', alignment=TA_CENTER, fontSize=14, fontName="Times-Roman") try: archivo_imagen = os.path.join(settings.MEDIA_ROOT, str(EMPRESA.logo)) imagen = Image(archivo_imagen, width=90, height=50, hAlign='LEFT') except: imagen = Paragraph(u"LOGO", sp) if grupos: titulo = Paragraph(u"RESUMEN MENSUAL DE ALMACÉN POR GRUPOS Y CUENTAS", sp) else: titulo = Paragraph(u"RESUMEN MENSUAL DE ALMACÉN POR PRODUCTOS", sp) encabezado = [[imagen,titulo]] tabla_encabezado = Table(encabezado, colWidths=[2 * cm, 23 * cm]) style = TableStyle( [ ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), ] ) tabla_encabezado.setStyle(style) return tabla_encabezado
def form_valid(self, form): data = form.cleaned_data docfile = data['archivo'] form.save() csv_filepathname = os.path.join(settings.MEDIA_ROOT,'archivos',str(docfile)) dataReader = csv.reader(open(csv_filepathname), delimiter=',', quotechar='"') for fila in dataReader: try: cuenta = CuentaContable.objects.get(cuenta=fila[0]) descripcion = fila[1] grupo_productos, creado = GrupoProductos.objects.get_or_create(descripcion=unicode(descripcion, errors='ignore'), defaults={'ctacontable' : cuenta }) except CuentaContable.DoesNotExist: pass return HttpResponseRedirect(reverse('productos:grupos_productos'))
def delete_test_image(image_field): """ Deletes test image generated as well as thumbnails if created. The recommended way of using this helper function is as follows: delete_test_image(object_1.image_property) :param image_field: The image field on an object. :return: None. """ warnings.warn(DeprecationWarning( "delete_test_image() is deprecated in favour of the " "get_sample_image() context manager."), stacklevel=2) # ensure all thumbs are deleted for filename in glob.glob( os.path.join( settings.MEDIA_ROOT, 'thumbs', image_field.name.split('/')[-1] ) + '*' ): os.unlink(filename) # delete the saved file image_field.delete()