我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用magic.from_buffer()。
def magic_mime_from_buffer(buffer: bytes) -> str: """ Try to detect mimetype using ``magic`` library. .. warning:: :exc:`.OptionalPackageRequirementError` will be raised if ``python-magic`` is not installed. :param buffer: buffer from header of file. :return: The mimetype """ if magic is None: # pragma: no cover raise OptionalPackageRequirementError('python-magic') return magic.from_buffer(buffer, mime=True) # wand image
def validate_file(form, field): # File cannot end with a forbidden extension filename, file_extension = os.path.splitext(field.data.filename) if len(file_extension) > 0: forbidden_ext = ForbiddenExtension.query.filter( ForbiddenExtension.extension == file_extension[1:]).first() if forbidden_ext is not None: raise ValidationError('Extension not allowed') mimedata = field.data mimetype = magic.from_buffer(field.data.read(1024), mime=True) # File Pointer returns to beginning field.data.seek(0, 0) # Check for permitted mimetype forbidden_mime = ForbiddenMimeType.query.filter( ForbiddenMimeType.mimetype == mimetype).first() if forbidden_mime is not None: raise ValidationError('File MimeType not allowed') extension = mimetypes.guess_extension(mimetype) if extension is not None: forbidden_real = ForbiddenExtension.query.filter( ForbiddenExtension.extension == extension[1:]).first() if forbidden_real is not None: raise ValidationError('Extension not allowed')
def write(self, data): """ Write data to multiple clouds. Uses a write-through buffer to ensure each chunk is the proper size. Close flushes the remainder of the buffer. """ if self._closed: raise IOError('I/O operation on closed file.') if self._size == 0: # First block. See if we can get a more specific mime type by # examining the data. mime = magic.from_buffer(data, mime=True) # Choose the better mimetype somehow, self.mime is determined by # the filename. mime is determined by magic. if not self.mime or mime != 'application/octet-strem': self.mime = mime self._size += len(data) self._md5.update(data) self._sha1.update(data) self._write_chunk(data)
def clean_file(self): content = self.cleaned_data[u'file'] filename, extension = os.path.splitext(content.name) if self.check_extension: if re.match(self._options['acceptFileTypes'], extension, flags=re.I) is None: raise forms.ValidationError('acceptFileTypes') if self.check_content_type: content_type = magic.from_buffer(content.read(1024), mime=True) if content_type.lower() in self._options['allowedContentTypes']: if content._size > self._options['maxFileSize']: raise forms.ValidationError("maxFileSize") else: raise forms.ValidationError("acceptFileTypes") return content
def load_index(self): try: resp = self.boto.get_object( Bucket=self.bucket, Key=self.index_path(), ) body = resp['Body'].read() content_type = magic.from_buffer(body, mime=True) if content_type == 'text/plain': logger.debug('Detected plain text encoding for index') return json.loads(body.decode('utf-8')) elif content_type == 'application/zlib': logger.debug('Detected zlib encoding for index') body = zlib.decompress(body) return json.loads(body.decode('utf-8')) elif content_type == 'application/x-empty': return {} else: raise ValueError('Unknown content type for index', content_type) except (ClientError): return {}
def get_filetype(data): """There are two versions of python-magic floating around, and annoyingly, the interface changed between versions, so we try one method and if it fails, then we try the other. NOTE: you may need to alter the magic_file for your system to point to the magic file.""" if sys.modules.has_key('magic'): try: ms = magic.open(magic.MAGIC_NONE) ms.load() return ms.buffer(data) except: try: return magic.from_buffer(data) except magic.MagicException: magic_custom = magic.Magic(magic_file='C:\windows\system32\magic') return magic_custom.from_buffer(data) return ''
def store(self): if len(self.data) >= self.MAX_SIZE: raise TooBigMedia(self.identifying_name, self.MAX_SIZE) mime = magic.from_buffer(self.data, mime=True) if mime not in self.allowed_mimetypes: raise InvalidMimeType(mime) self.extension = mimetypes.guess_extension(mime) # weirdness from mimetypes if self.extension == '.jpe': self.extension = '.jpeg' checksum = hashlib.sha1(self.data).hexdigest() fn = '{}{}'.format(checksum, self.extension) img = Image(organization=self.organization) img.file.save(fn, ContentFile(self.data)) return img.get_absolute_url()
def new_video(self, fb_user, fields): if 'message' not in fields: fields['message'] = '' if 'title' not in fields: fields['title'] = '' video_file = fields['video'] video_file.seek(0) mime_type = magic.from_buffer(video_file.read(), mime=True) video_file.seek(0) post_data = [('access_token', (None, fb_user.access_token)), ('source', (str(uuid4()) + '.' + mime_type.split('/')[1], video_file)), ('message', (None, fields['message']))] try: fb_request_url = Config.get("API_BASE_URI_VIDEO") + "/me/videos" resp = requests.post(fb_request_url, files=post_data) except Exception: pass log.error(_("A failure occurred while posting on Facebook : " "called with data: {}".format(post_data))) video_file.close()
def _get_filetype(self, data): """Gets filetype, uses libmagic if available. @param data: data to be analyzed. @return: file type or None. """ if not HAVE_MAGIC: return None try: ms = magic.open(magic.MAGIC_NONE) ms.load() file_type = ms.buffer(data) except: try: file_type = magic.from_buffer(data) except Exception: return None finally: try: ms.close() except: pass return file_type
def check_word_or_excel(self, fileobj, detected_type, extension): """ Returns proper mimetype in case of word or excel files """ word_strings = ['Microsoft Word', 'Microsoft Office Word', 'Microsoft Macintosh Word'] excel_strings = ['Microsoft Excel', 'Microsoft Office Excel', 'Microsoft Macintosh Excel'] office_strings = ['Microsoft OOXML'] file_type_details = magic.from_buffer(fileobj.read(READ_SIZE)) fileobj.seek(0) if any(string in file_type_details for string in word_strings): detected_type = 'application/msword' elif any(string in file_type_details for string in excel_strings): detected_type = 'application/vnd.ms-excel' elif any(string in file_type_details for string in office_strings) or \ (detected_type == 'application/vnd.ms-office'): if extension in ('.doc', '.docx'): detected_type = 'application/msword' if extension in ('.xls', '.xlsx'): detected_type = 'application/vnd.ms-excel' return detected_type
def run(self, directory): for root, dirs, files in os.walk(directory, followlinks=True): for name in files: filename = os.path.join(root, name) try: file_type = magic.from_buffer(open(filename).read(1024)) except: log("Error reading file %s: %s" % (filename, str(sys.exc_info()[1]))) continue if is_executable(file_type): md5_hash = md5(open(filename, "rb").read()).hexdigest() if not self.is_file_indexed(md5_hash): self.do_run(filename, file_type) else: log("File already indexed %s" % name) #-------------------------------------------------------------------------------
def test_html_to_md(self): """HTML to MD""" self.set_original_document_from_file('demo.html') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'html', 'to': 'md' }) destination_document = response.data assert '200' in response.status assert 'octet-stream' in response.content_type self.assertEqual(magic.from_buffer(destination_document, mime=True), 'text/x-c++') assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_html_to_rst(self): """HTML to RST""" self.set_original_document_from_file('demo.html') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'html', 'to': 'rst' }) destination_document = response.data assert '200' in response.status assert 'octet-stream' in response.content_type assert magic.from_buffer(destination_document, mime=True) == 'text/x-c' assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_html_to_docx(self): """HTML to DOCX""" self.set_original_document_from_file('demo.html') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'html', 'to': 'docx' }) destination_document = response.data assert '200' in response.status assert 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in response.content_type self.assertEqual(magic.from_buffer(destination_document, mime=True), 'application/vnd.openxmlformats-officedocument.wordprocessingml.document') assert os.listdir(app.config['CONVERSION_FOLDER']) == [] # # From MD #
def test_md_to_html(self): """MD to HTML""" self.set_original_document_from_file('demo.md') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'md', 'to': 'html' }) destination_document = response.data assert '200' in response.status assert 'text/html' in response.content_type assert magic.from_buffer(destination_document, mime=True) == 'text/html' assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_md_to_rst(self): """HTML to RST""" self.set_original_document_from_file('demo.md') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'md', 'to': 'rst' }) destination_document = response.data assert '200' in response.status assert 'octet-stream' in response.content_type assert magic.from_buffer(destination_document, mime=True) == 'text/x-c' assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_rst_to_html(self): """RST to HTML""" self.set_original_document_from_file('demo.rst') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'rst', 'to': 'html' }) destination_document = response.data assert '200' in response.status assert 'text/html' in response.content_type assert magic.from_buffer(destination_document, mime=True) == 'text/html' assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_rst_to_md(self): """RST to MD""" self.set_original_document_from_file('demo.rst') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'rst', 'to': 'md' }) destination_document = response.data assert '200' in response.status assert 'octet-stream' in response.content_type assert magic.from_buffer(destination_document, mime=True) == 'text/plain' assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_rst_to_docx(self): """RST to DOCX""" self.set_original_document_from_file('demo.rst') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'rst', 'to': 'docx' }) destination_document = response.data assert '200' in response.status assert 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in response.content_type assert magic.from_buffer(destination_document, mime=True) == 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' assert os.listdir(app.config['CONVERSION_FOLDER']) == [] # # From DOCX #
def test_docx_to_html(self): """DOCX to HTML""" self.set_original_document_from_file('demo.docx') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'docx', 'to': 'html' }) destination_document = response.data assert '200' in response.status assert 'text/html' in response.content_type assert magic.from_buffer(destination_document, mime=True) == 'text/html' assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_docx_to_rst(self): """DOCX to RST""" self.set_original_document_from_file('demo.docx') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': (self.original_document, self.original_document_name), 'from': 'docx', 'to': 'rst' }) destination_document = response.data assert '200' in response.status assert 'octet-stream' in response.content_type assert magic.from_buffer(destination_document, mime=True) == 'text/plain' assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def test_convert_from_string(self): """Convert from string""" self.set_original_document_from_string('#1') response = self.app.post('/convert/', buffered=True, content_type='multipart/form-data', data={ 'document': self.original_document_content_string, 'from': 'md', 'to': 'rst' }) destination_document = response.data assert '200' in response.status assert 'octet-stream' in response.content_type assert magic.from_buffer(destination_document, mime=True) == 'text/plain' assert os.listdir(app.config['CONVERSION_FOLDER']) == []
def _get_filetype(data): """Gets filetype, uses libmagic if available. @param data: data to be analyzed. @return: file type or None. """ if not HAVE_MAGIC: return None try: ms = magic.open(magic.MAGIC_SYMLINK) ms.load() file_type = ms.buffer(data) except: try: file_type = magic.from_buffer(data) except Exception: return None finally: try: ms.close() except: pass return file_type
def __call__(self, data): if self.max_size is not None and data.size > self.max_size: params = { 'max_size': filesizeformat(self.max_size), 'size': filesizeformat(data.size), } raise ValidationError(self.error_messages['max_size'], 'max_size', params) if self.min_size is not None and data.size < self.min_size: params = { 'min_size': filesizeformat(self.mix_size), 'size': filesizeformat(data.size) } raise ValidationError(self.error_messages['min_size'], 'min_size', params) if self.content_types is not None and len(self.content_types): content_type = magic.from_buffer(data.read(), mime=True) data.seek(0) # seek to start for future mime checks by django if content_type not in self.content_types: params = { 'content_type': content_type } raise ValidationError(self.error_messages['content_type'], 'content_type', params)
def upload_image(): name = request.forms.get('name') data = request.files.get('data') if name and data and data.file: raw = data.file.read() filename = data.filename save_path="{path}/{file}".format( path=config.get('storage','imagesdir'), file=filename ) if not os.path.exists(config.get('storage','imagesdir')): os.makedirs(save_path) if 'image' not in magic.from_buffer(raw): return HTTPResponse(status=400,body=json.dumps({'error' : 'file type is not allowed'})) with open(save_path,'w') as open_file: open_file.write(raw) if queue.add_to_queue(queue_name='images',image=save_path): return HTTPResponse(status=200,body=json.dumps({'status' : 'Image Stored'})) else: return HTTPResponse(status=500,body=json.dumps({'error' : 'Internal Server Error'})) else: return HTTPResponse(status=400,body=json.dumps({'error' : 'missing fields'}))
def get_type(data): try: ms = magic.open(magic.MAGIC_NONE) ms.load() file_type = ms.buffer(data) except: try: file_type = magic.from_buffer(data) except: return '' finally: try: ms.close() except: pass return file_type
def validate_file_type_and_size(upload): file_max_mb = 5 max_size = file_max_mb*10**6 fileformats = settings.FILE_ALIASES['*']['fileformats'] mimetypes = [mimetype for name, mimetype in fileformats] names = [name for name, mimetype in fileformats] errors = [] filetype = magic.from_buffer(upload.read(), mime=True) if filetype.lower() not in mimetypes: msg = _( 'Unsupported file format. Supported formats are {}.'.format( ', '.join(names) ) ) errors.append(ValidationError(msg)) if upload.size > max_size: msg = _('File should be at most {} MB'.format(file_max_mb)) errors.append(ValidationError(msg)) if errors: raise ValidationError(errors) return upload
def __init__(self, file_obj, orig_filename=None): self.file_obj = BytesIO(file_obj) self.orig_filename = orig_filename if self.orig_filename: self.final_filename = self.orig_filename else: self.final_filename = 'unknownfile.bin' self.log_details = {'origFilename': self.orig_filename} self.log_string = '' if self.orig_filename: a, self.extension = os.path.splitext(self.orig_filename) else: self.extension = None try: mt = magic.from_buffer(self.file_obj.getvalue(), mime=True) except UnicodeEncodeError as e: # FIXME: The encoding of the file is broken (possibly UTF-16) mt = '' self.log_details.update({'UnicodeError': e}) try: self.mimetype = mt.decode("utf-8") except: self.mimetype = mt if self.mimetype and '/' in self.mimetype: self.main_type, self.sub_type = self.mimetype.split('/') else: self.main_type = '' self.sub_type = ''
def genReqStr( params ): boundary_str = "---"+''.join( [ random.choice(string.ascii_lowercase+string.ascii_uppercase + string.digits) for x in range(13) ] ) boundary = boundary_str.encode("UTF-8") res = b'Content-Type: multipart/mixed; boundary="'+boundary+b'"\nMIME-Version: 1.0\n' res += b'\n--'+boundary for(key, value) in list(params.items()): if all( [x in dir( value ) for x in ["name", "read", "mimetype"] ] ): #File dataVal = value.read() type = value.mimetype if type=="application/octet-stream": type = magic.from_buffer(dataVal, mime=True) res += b'\nContent-Type: '+type.encode("UTF-8")+b'\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"; filename="'+os.path.basename(value.name).decode(sys.getfilesystemencoding()).encode("UTF-8")+b'"\n\n' res += dataVal res += b'\n--'+boundary elif isinstance( value, list ): for val in value: res += b'\nContent-Type: application/octet-stream\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"\n\n' if isinstance( val, unicode ): res += val.encode("UTF-8") else: res += str(val) res += b'\n--'+boundary else: res += b'\nContent-Type: application/octet-stream\nMIME-Version: 1.0\nContent-Disposition: form-data; name="'+key.encode("UTF-8")+b'"\n\n' if isinstance( value, unicode ): res += unicode( value ).encode("UTF-8") else: res += str( value ) res += b'\n--'+boundary res += b'--\n' return( res, boundary )
def __call__(self, value): # Check the content type mimetype = magic.from_buffer(value.read(1024), mime=True) if self.allowed_mimetypes and mimetype not in self.allowed_mimetypes: message = self.mime_message % { 'mimetype': mimetype, 'allowed_mimetypes': ', '.join(self.allowed_mimetypes) } raise ValidationError(message)
def create_in_memory_image(image, name, size): """ Resizes the image and saves it as InMemoryUploadedFile object Returns the InMemoryUploadedFile object with the image data """ output = io.BytesIO() # create an io object # resize the image and save it to the io object image_resize(image, output, size) # get MIME type of the image mime = magic.from_buffer(output.getvalue(), mime=True) # create InMemoryUploadedFile using data from the io return uploadedfile.InMemoryUploadedFile(output, 'ImageField', name, mime, sys.getsizeof(output), None)
def validate_file(self): data = self.body.data.read(16) self.body.data.seek(0) if not magic.from_buffer(data, mime=True) in self.allowed_extensions: return False return True
def validate_file(self): data = self.testcase.data.read(16) self.testcase.data.seek(0) if not magic.from_buffer(data, mime=True) in self.allowed_extensions: return False return True
def validate_file(self): data = self.code.data.read(16) self.code.data.seek(0) #if not magic.from_buffer(data, mime=True) in self.allowed_extensions: # return False #return True return magic.from_buffer(data, mime=True).startswith('text/')
def detect(self, path): if os.path.isdir(path): return DIRECTORY with open(path, 'rb') as fd: mimetype = magic.from_buffer(fd.read(128), mime=True) if mimetype and mimetype not in UNKNOWN_MIMETYPE: return TypeString(mimetype)
def mimetype(self): """ Readable mimetype, guessed from file content """ self.file.open() return magic.from_buffer(self.file.read(1024), mime=True)
def add_location(self, location): """ Add a media location to this subject. - **location** can be an open :py:class:`file` object, a path to a local file, or a :py:class:`dict` containing MIME types and URLs for remote media. Examples:: subject.add_location(my_file) subject.add_location('/data/image.jpg') subject.add_location({'image/png': 'https://example.com/image.png'}) """ if type(location) is dict: self.locations.append(location) self._media_files.append(None) return elif type(location) in (str,) + _OLD_STR_TYPES: f = open(location, 'rb') else: f = location try: media_data = f.read() if MEDIA_TYPE_DETECTION == 'magic': media_type = magic.from_buffer(media_data, mime=True) else: media_type = 'image/{}'.format(imghdr.what(None, media_data)) self.locations.append(media_type) self._media_files.append(media_data) finally: f.close()
def new_picture(self, fb_user, fields): if 'message' not in fields: fields['message'] = '' image_file = fields['image'] image_file.seek(0) mime_type = magic.from_buffer(image_file.read(), mime=True) image_file.seek(0) post_data = [('access_token', (None, fb_user.access_token)), ('source', (str(uuid4()) + '.' + mime_type.split('/')[1], image_file)), ('caption', (None, fields['message'])), ('message', (None, fields['message']))] try: fb_request_url = Config.get("API_BASE_URI") + "/{}/photos".format(fb_user.username) resp = requests.post(fb_request_url, files=post_data) if resp.ok and "post_id" in resp.json(): log.info(_("A new Post with ID {} published!".format(resp.json()['post_id']))) else: raise Exception except Exception: pass log.error(_("A failure occurred while posting on Facebook : " "called with data: {}\n response: {}".format(post_data, resp.json() ) ) ) image_file.close() webhook_data = { "time": 0, "id": "101915710270588", "changed_fields": ["statuses"], "uid": fb_user.username } self.fire_trigger(webhook_data)
def __call__(self, fileobj): detected_type = magic.from_buffer(fileobj.read(READ_SIZE), mime=True) root, extension = os.path.splitext(fileobj.name.lower()) # seek back to start so a valid file could be read # later without resetting the position fileobj.seek(0) # some versions of libmagic do not report proper mimes for Office subtypes # use detection details to transform it to proper mime if detected_type in ('application/octet-stream', 'application/vnd.ms-office'): detected_type = self.check_word_or_excel(fileobj, detected_type, extension) if detected_type not in self.allowed_mimes: # use more readable file type names for feedback message allowed_types = map(lambda mime_type: mime_type.split('/')[1], self.allowed_mimes) raise ValidationError( message=self.type_message, params={ 'detected_type': detected_type, 'allowed_types': ', '.join(allowed_types) }, code='invalid_type' ) if self.allowed_exts and (extension not in self.allowed_exts): raise ValidationError( message=self.extension_message, params={ 'extension': extension, 'allowed_extensions': ', '.join(self.allowed_exts) }, code='invalid_extension' )
def get_filetype(data): """There are two versions of python-magic floating around, and annoyingly, the interface changed between versions, so we try one method and if it fails, then we try the other""" if sys.modules.has_key('magic'): try: ms = magic.open(magic.MAGIC_NONE) ms.load() return ms.buffer(data) except: return magic.from_buffer(data)
def MIME_TYPE(data, mime=True): try: return magic.from_buffer(data, mime=mime) except magic.MagicException: return "none/none"
def file_is(file_description, fmt): """Get if file stored in `file_path` is a `fmt` document. :file_path: Full path for a `fmt` file or a buffer containing `fmt` data. :returns: True if is `fmt` and False otherwise """ import magic logger.debug("Checking filetype") if isinstance(file_description, str): # This means that the file_description is a string result = re.match( r".*%s.*" % fmt, magic.from_file(file_description, mime=True), re.IGNORECASE ) if result: logger.debug( "File %s appears to be of type %s" % (file_description, fmt) ) elif isinstance(file_description, bytes): # Suppose that file_description is a buffer result = re.match( r".*%s.*" % fmt, magic.from_buffer(file_description, mime=True) ) if result: logger.debug( "Buffer appears to be of type %s" % (fmt) ) return True if result else False
def process(self): return magic.from_buffer(self.buffer)
def pack_file(f): record_data = f.read() record_type = magic.from_buffer(record_data, mime=1) record_name = getattr(f, 'name', '<stdin>')[0:255] return ndef.Record(record_type, record_name, record_data)
def mimetype(file_header): """ Detect mime type by reading file header. file_header: first bytes to read """ return magic.from_buffer(file_header, mime=True)
def mimetype(file_path): """ Detect mime type by reading first 1024 bytes of file file_path: file to detect mime type """ # read first 1024 bytes of file to determine mime type return magic.from_buffer(open(file_path, 'rb').read(1024), mime=True)
def _magic_get_file_type(f, _): file_type = magic.from_buffer(f.read(1024), mime=True) f.seek(0) return maybe_decode(file_type)