我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mimetypes.guess_type()。
def upload_to_s3(css_file): bucket_name = settings.AWS_BUCKET_NAME conn = S3Connection(settings.AWS_ACCESS_KEY_ID, settings.AWS_SECRET_ACCESS_KEY) folder = 'webpack_bundles/' bucket = conn.get_bucket(bucket_name=bucket_name) filename = css_file.split('/')[-1] file_obj = open(css_file, 'r') content = file_obj.read() key = folder + filename bucket = conn.get_bucket(bucket_name=bucket_name) mime = mimetypes.guess_type(filename)[0] k = Key(bucket) k.key = key # folder + filename k.set_metadata("Content-Type", mime) k.set_contents_from_string(content) public_read = True if public_read: k.set_acl("public-read")
def serve(self, request, path): # the following code is largely borrowed from `django.views.static.serve` # and django-filetransfers: filetransfers.backends.default fullpath = os.path.join(settings.PRIVATE_MEDIA_ROOT, path) if not os.path.exists(fullpath): raise Http404('"{0}" does not exist'.format(fullpath)) # Respect the If-Modified-Since header. statobj = os.stat(fullpath) content_type = mimetypes.guess_type(fullpath)[0] or 'application/octet-stream' if not was_modified_since(request.META.get('HTTP_IF_MODIFIED_SINCE'), statobj[stat.ST_MTIME], statobj[stat.ST_SIZE]): return HttpResponseNotModified(content_type=content_type) response = HttpResponse(open(fullpath, 'rb').read(), content_type=content_type) response["Last-Modified"] = http_date(statobj[stat.ST_MTIME]) # filename = os.path.basename(path) # response['Content-Disposition'] = smart_str(u'attachment; filename={0}'.format(filename)) return response
def send_document_file(self, input_file, entity, caption=''): """Sends a previously uploaded input_file (which should be a document) to the given entity (or input peer)""" # Determine mime-type and attributes # Take the first element by using [0] since it returns a tuple mime_type = guess_type(input_file.name)[0] attributes = [ DocumentAttributeFilename(input_file.name) # TODO If the input file is an audio, find out: # Performer and song title and add DocumentAttributeAudio ] # Ensure we have a mime type, any; but it cannot be None # 'The "octet-stream" subtype is used to indicate that a body # contains arbitrary binary data.' if not mime_type: mime_type = 'application/octet-stream' self.send_media_file( InputMediaUploadedDocument( file=input_file, mime_type=mime_type, attributes=attributes, caption=caption), entity)
def screenshot(self, options: dict = None, **kwargs: Any) -> bytes: """Take screen shot.""" options = options or dict() options.update(kwargs) screenshotType = None if 'path' in options: mimeType, _ = mimetypes.guess_type(options['path']) if mimeType == 'image/png': screenshotType = 'png' elif mimeType == 'image/jpeg': screenshotType = 'jpeg' else: raise PageError('Unsupported screenshot ' f'mime type: {mimeType}') if 'type' in options: screenshotType = options['type'] if not screenshotType: screenshotType = 'png' return await self._screenshotTask(screenshotType, options)
def serve_static(abe, path, start_response): slen = len(abe.static_path) if path[:slen] != abe.static_path: raise PageNotFound() path = path[slen:] try: # Serve static content. # XXX Should check file modification time and handle HTTP # if-modified-since. Or just hope serious users will map # our htdocs as static in their web server. # XXX is "+ '/' + path" adequate for non-POSIX systems? found = open(abe.htdocs + '/' + path, "rb") import mimetypes type, enc = mimetypes.guess_type(path) # XXX Should do something with enc if not None. # XXX Should set Content-length. start_response('200 OK', [('Content-type', type or 'text/plain')]) return found except IOError: raise PageNotFound() # Change this if you want empty or multi-byte address versions.
def add_file(self, name, file, filename=None, content_type=None): """Adds a new file to the dict. `file` can be a file name or a :class:`file`-like or a :class:`FileStorage` object. :param name: the name of the field. :param file: a filename or :class:`file`-like object :param filename: an optional filename :param content_type: an optional content type """ if isinstance(file, FileStorage): value = file else: if isinstance(file, string_types): if filename is None: filename = file file = open(file, 'rb') if filename and content_type is None: content_type = mimetypes.guess_type(filename)[0] or \ 'application/octet-stream' value = FileStorage(file, filename, name, content_type) self.add(name, value)
def checkforhtml(self, info, url): # if info.has_key('content-type'): # ctype = cgi.parse_header(info['content-type'])[0].lower() # if ';' in ctype: # # handle content-type: text/html; charset=iso8859-1 : # ctype = ctype.split(';', 1)[0].strip() # else: # if url[-1:] == "/": # return 1 # ctype, encoding = mimetypes.guess_type(url) # if ctype == 'text/html': # return 1 # else: # self.note(1, " Not HTML, mime type %s", ctype) # return 0 return 1
def __init__(self, filename, mimetype=None, chunksize=DEFAULT_CHUNK_SIZE, resumable=False): """Constructor. Args: filename: string, Name of the file. mimetype: string, Mime-type of the file. If None then a mime-type will be guessed from the file extension. chunksize: int, File will be uploaded in chunks of this many bytes. Only used if resumable=True. Pass in a value of -1 if the file is to be uploaded in a single chunk. Note that Google App Engine has a 5MB limit on request size, so you should never set your chunksize larger than 5MB, or to -1. resumable: bool, True if this is a resumable upload. False means upload in a single request. """ self._filename = filename fd = open(self._filename, 'rb') if mimetype is None: (mimetype, encoding) = mimetypes.guess_type(filename) super(MediaFileUpload, self).__init__(fd, mimetype, chunksize=chunksize, resumable=resumable)
def get_content_type(self): """Returns the ``Content-Type`` header to be used for this request. .. versionadded:: 3.1 """ mime_type, encoding = mimetypes.guess_type(self.absolute_path) # per RFC 6713, use the appropriate type for a gzip compressed file if encoding == "gzip": return "application/gzip" # As of 2015-07-21 there is no bzip2 encoding defined at # http://www.iana.org/assignments/media-types/media-types.xhtml # So for that (and any other encoding), use octet-stream. elif encoding is not None: return "application/octet-stream" elif mime_type is not None: return mime_type # if mime_type not detected, use application/octet-stream else: return "application/octet-stream"
def get_delta(asked_file_path, cached_file_path): """ Return a tuple of (body, mime_type), given two file paths """ if not exists(asked_file_path): raise InvalidAskedFile("%s not found." % asked_file_path) if not exists(cached_file_path): raise InvalidCachedFile("%s not found." % asked_file_path) with open(asked_file_path, 'r') as filep: asked_file_string = filep.read() with open(cached_file_path, 'r') as filep: cached_file_string = filep.read() body = calculate_delta(asked_file_string, cached_file_string) mime_type = "text/sw-delta" if len(body) > len(asked_file_string): body = asked_file_string mime_type, _ = guess_type(asked_file_path) return body, mime_type
def multipart_encode(vars, files, boundary = None, buffer = None): if boundary is None: boundary = mimetools.choose_boundary() if buffer is None: buffer = '' for(key, value) in vars: buffer += '--%s\r\n' % boundary buffer += 'Content-Disposition: form-data; name="%s"' % key buffer += '\r\n\r\n' + value + '\r\n' for(key, fd) in files: file_size = os.fstat(fd.fileno())[stat.ST_SIZE] filename = fd.name.split('/')[-1] contenttype = mimetypes.guess_type(filename)[0] or 'application/octet-stream' buffer += '--%s\r\n' % boundary buffer += 'Content-Disposition: form-data; name="%s"; filename="%s"\r\n' % (key, filename) buffer += 'Content-Type: %s\r\n' % contenttype # buffer += 'Content-Length: %s\r\n' % file_size fd.seek(0) buffer += '\r\n' + fd.read() + '\r\n' buffer += '--%s--\r\n\r\n' % boundary return boundary, buffer
def _create_attachment(self, filename, content, mimetype=None): """ Converts the filename, content, mimetype triple into a MIME attachment object. """ if mimetype is None: mimetype, _ = mimetypes.guess_type(filename) if mimetype is None: mimetype = DEFAULT_ATTACHMENT_MIME_TYPE attachment = self._create_mime_attachment(content, mimetype) if filename: try: filename.encode('ascii') except UnicodeEncodeError: if six.PY2: filename = filename.encode('utf-8') filename = ('utf-8', '', filename) attachment.add_header('Content-Disposition', 'attachment', filename=filename) return attachment
def encode_file(boundary, key, file): to_bytes = lambda s: force_bytes(s, settings.DEFAULT_CHARSET) filename = os.path.basename(file.name) if hasattr(file, 'name') else '' if hasattr(file, 'content_type'): content_type = file.content_type elif filename: content_type = mimetypes.guess_type(filename)[0] else: content_type = None if content_type is None: content_type = 'application/octet-stream' if not filename: filename = key return [ to_bytes('--%s' % boundary), to_bytes('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename)), to_bytes('Content-Type: %s' % content_type), b'', to_bytes(file.read()) ]
def _get_encoded_attachments(self): attachments = self.get_attachments() if attachments: new_attachments = [] for attachment in attachments: if isinstance(attachment, File): attachment.seek(0) new_attachments.append((attachment.name, attachment.read(), guess_type(attachment.name)[0])) else: new_attachments.append(attachment) attachments = new_attachments return jsonpickle.dumps(attachments)
def protected_view(request, path, server="django", as_download=False): if server != "django": mimetype, encoding = mimetypes.guess_type(path) response = HttpResponse() response["Content-Type"] = mimetype if encoding: response["Content-Encoding"] = encoding if as_download: response["Content-Disposition"] = "attachment; filename={}".format( basename(path)) response[server_header(server)] = os.path.join( PROTECTED_MEDIA_LOCATION_PREFIX, path ).encode("utf8") else: response = serve( request, path, document_root=PROTECTED_MEDIA_ROOT, show_indexes=False ) return response
def getType(file): (type, _) = guess_type(file) if type is None: # Detect some unknown types if file[-12:].lower() == "video_ts.ifo": return "video/x-dvd" if file == "/media/audiocd/cdplaylist.cdpls": return "audio/x-cda" p = file.rfind('.') if p == -1: return None ext = file[p+1:].lower() if ext == "dat" and file[-11:-6].lower() == "avseq": return "video/x-vcd" return type
def guess_type(self, path, allow_directory=True): """ Guess the type of a file. If allow_directory is False, don't consider the possibility that the file is a directory. Parameters ---------- obj: s3.Object or string """ if path.endswith(".ipynb"): return "notebook" elif allow_directory and self.dir_exists(path): return "directory" else: return "file"
def get(self, path, content=True, type=None, format=None): # Get a file or directory model. self.log.debug("S3contents.GenericManager.get] path('%s') type(%s) format(%s)", path, type, format) path = path.strip('/') if type is None: type = self.guess_type(path) try: func = { "directory": self._get_directory, "notebook": self._get_notebook, "file": self._get_file, }[type] except KeyError: raise ValueError("Unknown type passed: '{}'".format(type)) return func(path=path, content=content, format=format)
def _file_model_from_path(self, path, content=False, format=None): """ Build a file model from database record. """ model = base_model(path) model["type"] = "file" if self.fs.isfile(path): model["last_modified"] = model["created"] = self.fs.lstat(path)["ST_MTIME"] else: model["last_modified"] = model["created"] = DUMMY_CREATED_DATE if content: try: content = self.fs.read(path) except NoSuchFile as e: self.no_such_entity(e.path) except GenericFSError as e: self.do_error(str(e), 500) model["format"] = format or "text" model["content"] = content model["mimetype"] = mimetypes.guess_type(path)[0] or "text/plain" if format == "base64": model["format"] = format or "base64" from base64 import b64decode model["content"] = b64decode(content) return model
def _convert_file_records(self, paths): """ Applies _notebook_model_from_s3_path or _file_model_from_s3_path to each entry of `paths`, depending on the result of `guess_type`. """ ret = [] for path in paths: # path = self.fs.remove_prefix(path, self.prefix) # Remove bucket prefix from paths if os.path.basename(path) == self.fs.dir_keep_file: continue type_ = self.guess_type(path, allow_directory=True) if type_ == "notebook": ret.append(self._notebook_model_from_path(path, False)) elif type_ == "file": ret.append(self._file_model_from_path(path, False, None)) elif type_ == "directory": ret.append(self._directory_model_from_path(path, False)) else: self.do_error("Unknown file type %s for file '%s'" % (type_, path), 500) return ret
def body_producer(boundary, files): buf = BytesIO() write = buf.write for (filename, data) in files: write('--{}\r\n'.format(boundary).encode('utf-8')) write('Content-Disposition: form-data; name="{}"; filename="{}"\r\n'.format( filename, filename).encode('utf-8')) mtype = mimetypes.guess_type(filename)[0] or 'application/octet-stream' write('Content-Type: {}\r\n'.format(mtype).encode('utf-8')) write(b'\r\n') write(data) write(b'\r\n') write('--{}--\r\n'.format(boundary).encode('utf-8')) return buf.getbuffer().tobytes()
def is_tarfile(path): """ Returns if the path belongs to a tarfile or not. .. versionadded:: 1.2.0 :param str path: path to be checked :returns: **True** if the path belongs to a tarball, **False** otherwise """ # Checking if it's a tar file may fail due to permissions so failing back # to the mime type... # # IOError: [Errno 13] Permission denied: '/vmlinuz.old' # # With python 3 insuffient permissions raises an AttributeError instead... # # http://bugs.python.org/issue17059 try: return tarfile.is_tarfile(path) except (IOError, AttributeError): return mimetypes.guess_type(path)[0] == 'application/x-tar'
def nfs_open(self, req): url = req.full_url directory_name, file_name = os.path.split(url) server_name = req.host print('FauxNFSHandler simulating mount:') print(' Remote path: {}'.format(directory_name)) print(' Server : {}'.format(server_name)) print(' Local path : {}'.format( os.path.basename(tempdir))) print(' Filename : {}'.format(file_name)) local_file = os.path.join(tempdir, file_name) fp = NFSFile(tempdir, file_name) content_type = ( mimetypes.guess_type(file_name)[0] or 'application/octet-stream' ) stats = os.stat(local_file) size = stats.st_size headers = { 'Content-type': content_type, 'Content-length': size, } return response.addinfourl(fp, headers, req.get_full_url())
def guess_mimetype(resource_path): """ Guesses the mimetype of a given resource. Args: resource_path: the path to a given resource. Returns: The mimetype string. """ mime = mimetypes.guess_type(resource_path)[0] if mime is None: return "application/octet-stream" return mime
def get(self, path): """Serve the requested resource to the client. Super class will validate credentials. Returns the file in the HTTP response, or a 404 if the resource can not be found. Args: path: path to the resource. """ abs_path = os.path.abspath(os.path.join('static', path)) if '..' in abs_path: self.response.set_status(400) return if os.path.isdir(abs_path) or abs_path.find(os.getcwd()) != 0: self.response.set_status(403) return try: with open(abs_path, 'r') as f: content_type = mimetypes.guess_type(abs_path)[0] self.response.content_type = content_type self.response.out.write(f.read()) except IOError: self.response.set_status(404)
def guess_content_type(filename, default='application/octet-stream'): """ Guess the "Content-Type" of a file. :param filename: The filename to guess the "Content-Type" of using :mod:`mimetypes`. :param default: If no "Content-Type" can be guessed, default to `default`. """ if filename: return mimetypes.guess_type(filename)[0] or default return default
def send(self, request, stream=None, timeout=None, verify=None, cert=None, proxies=None): pathname = url_to_path(request.url) resp = Response() resp.status_code = 200 resp.url = request.url try: stats = os.stat(pathname) except OSError as exc: resp.status_code = 404 resp.raw = exc else: modified = email.utils.formatdate(stats.st_mtime, usegmt=True) content_type = mimetypes.guess_type(pathname)[0] or "text/plain" resp.headers = CaseInsensitiveDict({ "Content-Type": content_type, "Content-Length": stats.st_size, "Last-Modified": modified, }) resp.raw = open(pathname, "rb") resp.close = resp.raw.close return resp
def unpack_http_url(link, location, download_dir=None, session=None, hashes=None): if session is None: raise TypeError( "unpack_http_url() missing 1 required keyword argument: 'session'" ) temp_dir = tempfile.mkdtemp('-unpack', 'pip-') # If a download dir is specified, is the file already downloaded there? already_downloaded_path = None if download_dir: already_downloaded_path = _check_download_dir(link, download_dir, hashes) if already_downloaded_path: from_path = already_downloaded_path content_type = mimetypes.guess_type(from_path)[0] else: # let's download to a tmp dir from_path, content_type = _download_http_url(link, session, temp_dir, hashes) # unpack the archive to the build dir location. even when only downloading # archives, they have to be unpacked to parse dependencies unpack_file(from_path, location, content_type, link) # a download dir is specified; let's copy the archive there if download_dir and not already_downloaded_path: _copy_file(from_path, download_dir, link) if not already_downloaded_path: os.unlink(from_path) rmtree(temp_dir)
def serve(self, request, path): response = HttpResponse() fullpath = os.path.join(settings.PRIVATE_MEDIA_ROOT, path) response['X-Accel-Redirect'] = fullpath response['Content-Type'] = mimetypes.guess_type(path)[0] or 'application/octet-stream' return response
def serve(self, request, path): fullpath = os.path.join(settings.PRIVATE_MEDIA_ROOT, path) response = HttpResponse() response['X-Sendfile'] = fullpath # From django-filer (https://github.com/stefanfoulis/django-filer/): # This is needed for lighttpd, hopefully this will # not be needed after this is fixed: # http://redmine.lighttpd.net/issues/2076 response['Content-Type'] = mimetypes.guess_type(path)[0] or 'application/octet-stream' # filename = os.path.basename(path) # response['Content-Disposition'] = smart_str(u'attachment; filename={0}'.format(filename)) return response
def add_file(self, fieldname, filename, fileHandle, mimetype=None): body = fileHandle.read() if mimetype is None: mimetype = mimetypes.guess_type(filename)[ 0] or 'application/ octet-stream' self.files.append((fieldname, filename, mimetype, body))
def open_local_file(self, req): import email.utils import mimetypes host = req.get_host() filename = req.get_selector() localfile = url2pathname(filename) try: stats = os.stat(localfile) size = stats.st_size modified = email.utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(filename)[0] headers = mimetools.Message(StringIO( 'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified))) if host: host, port = splitport(host) if not host or \ (not port and _safe_gethostbyname(host) in self.get_names()): if host: origurl = 'file://' + host + filename else: origurl = 'file://' + filename return addinfourl(open(localfile, 'rb'), headers, origurl) except OSError, msg: # urllib2 users shouldn't expect OSErrors coming from urlopen() raise URLError(msg) raise URLError('file not on local host')
def open_local_file(self, url): """Use local file.""" import mimetypes, mimetools, email.utils try: from cStringIO import StringIO except ImportError: from StringIO import StringIO host, file = splithost(url) localname = url2pathname(file) try: stats = os.stat(localname) except OSError, e: raise IOError(e.errno, e.strerror, e.filename) size = stats.st_size modified = email.utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(url)[0] headers = mimetools.Message(StringIO( 'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified))) if not host: urlfile = file if file[:1] == '/': urlfile = 'file://' + file return addinfourl(open(localname, 'rb'), headers, urlfile) host, port = splitport(host) if not port \ and socket.gethostbyname(host) in (localhost(), thishost()): urlfile = file if file[:1] == '/': urlfile = 'file://' + file return addinfourl(open(localname, 'rb'), headers, urlfile) raise IOError, ('local file error', 'not on local host')
def __init__(self, *args, **kwargs): """ Set name, ext and content_type (if possible) automatically initialize by path. """ super(File, self).__init__(*args, **kwargs) pieces = os.path.splitext(os.path.basename(str(self.path))) self.name, self.ext = pieces[0], pieces[1][1:] self.content_type = mimetypes.guess_type(self.full_path)[0]
def __init__(self, filename, mimetype=None, chunksize=DEFAULT_CHUNK_SIZE, resumable=False): """Constructor. Args: filename: string, Name of the file. mimetype: string, Mime-type of the file. If None then a mime-type will be guessed from the file extension. chunksize: int, File will be uploaded in chunks of this many bytes. Only used if resumable=True. Pass in a value of -1 if the file is to be uploaded in a single chunk. Note that Google App Engine has a 5MB limit on request size, so you should never set your chunksize larger than 5MB, or to -1. resumable: bool, True if this is a resumable upload. False means upload in a single request. """ self._filename = filename fd = open(self._filename, 'rb') if mimetype is None: # No mimetype provided, make a guess. mimetype, _ = mimetypes.guess_type(filename) if mimetype is None: # Guess failed, use octet-stream. mimetype = 'application/octet-stream' super(MediaFileUpload, self).__init__(fd, mimetype, chunksize=chunksize, resumable=resumable)