我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用magic.Magic()。
def __init__(self,file,dest): pymagic = magic.Magic(uncompress=False) filetype = pymagic.from_file(file) if 'zip archive' in filetype.lower(): file = zipfile.ZipFile(file) file.extractall(dest) elif 'gzip' in filetype.lower(): f = gzip.open(dest) sp3file = f.read() f.close()
def get(self, request, *args, **kwargs): path = kwargs.get("path") # No path? You're boned. Move along. if not path: raise Http404 if self._is_url(path): content = requests.get(path, stream=True).raw.read() else: # Normalise the path to strip out naughty attempts path = os.path.normpath(path).replace( settings.MEDIA_URL, settings.MEDIA_ROOT, 1) # Evil path request! if not path.startswith(settings.MEDIA_ROOT): raise Http404 # The file requested doesn't exist locally. A legit 404 if not os.path.exists(path): raise Http404 with open(path, "rb") as f: content = f.read() content = Cryptographer.decrypted(content) return HttpResponse( content, content_type=magic.Magic(mime=True).from_buffer(content))
def get_filetype(data): """There are two versions of python-magic floating around, and annoyingly, the interface changed between versions, so we try one method and if it fails, then we try the other. NOTE: you may need to alter the magic_file for your system to point to the magic file.""" if sys.modules.has_key('magic'): try: ms = magic.open(magic.MAGIC_NONE) ms.load() return ms.buffer(data) except: try: return magic.from_buffer(data) except magic.MagicException: magic_custom = magic.Magic(magic_file='C:\windows\system32\magic') return magic_custom.from_buffer(data) return ''
def create_task_log(self, task_id, case_task_log): """ :param task_id: Task identifier :param case_task_log: TheHive log :type case_task_log: CaseTaskLog defined in models.py :return: TheHive log :rtype: json """ req = self.url + "/api/case/task/{}/log".format(task_id) data = {'_json': json.dumps({"message":case_task_log.message})} if case_task_log.file: f = {'attachment': (os.path.basename(case_task_log.file), open(case_task_log.file, 'rb'), magic.Magic(mime=True).from_file(case_task_log.file))} try: return requests.post(req, data=data,files=f, proxies=self.proxies, auth=self.auth, verify=self.cert) except requests.exceptions.RequestException as e: sys.exit("Error: {}".format(e)) else: try: return requests.post(req, headers={'Content-Type': 'application/json'}, data=json.dumps({'message':case_task_log.message}), proxies=self.proxies, auth=self.auth, verify=self.cert) except requests.exceptions.RequestException as e: sys.exit("Error: {}".format(e))
def magic(indata, mime=False): """ Performs file magic while maintaining compatibility with different libraries. """ try: if mime: mymagic = magic.open(magic.MAGIC_MIME_TYPE) else: mymagic = magic.open(magic.MAGIC_NONE) mymagic.load() except AttributeError: mymagic = magic.Magic(mime) mymagic.file = mymagic.from_file return mymagic.file(indata)
def get_input_file_type(input_file: str) -> str: if os.environ.get("CONDA_PREFIX"): magic_file = "{}/share/misc/magic.mgc".format(os.environ["CONDA_PREFIX"]) else: magic_file = os.environ.get("LIBMAGIC_FILE_PATH") if magic_file: mime_type = magic.Magic(magic_file=magic_file, mime=True).from_file(input_file) else: eprint("Magic file was not found so python-magic will probably fail. Set LIBMAGIC_FILE_PATH environment variable with path to 'magic.mgc' file (usually '/usr/share/misc/magic.mgc'.") mime_type = magic.Magic(mime=True).from_file(input_file) input_type = mime_type.split("/") if input_type[1] == "pdf": return "pdf" elif input_type[0] == "image": return "image" elif input_type[0] == "text": return "text" else: return mime_type
def __init__(self, filepath): self.path = filepath self.filename = os.path.basename(filepath) self.stream = open(filepath, 'r').read() if magic.Magic(mime=True).from_file(filepath) == 'application/x-dosexec': try: self.pe = pefile.PE(filepath) self.pedict = self.pe.dump_dict() except Exception as excp: print('Failed processing %s') % filepath # Magic
def magic(self): return magic.Magic().from_file(self.path)
def mimetype(self): return magic.Magic(mime=True).from_file(self.path) # FileType
def __init__(self): self.magic = Magic(mime=True)
def __init__(self, config): self.config = config self.twitter = None self.magic = Magic(mime=True)
def main(argv=None): if argv is None: argv = sys.argv p = argparse.ArgumentParser( description='SAVAPI Client', formatter_class=argparse.ArgumentDefaultsHelpFormatter ) p.add_argument('--savapi-host', dest='host', help='SAVAPI service host', default='127.0.0.1') p.add_argument('--savapi-port', dest='port', help='SAVAPI service port', default=9999, type=int) p.add_argument('file', help='Absolute path of file') try: args = p.parse_args(argv[1:]) except SystemExit: return 2 log = logging.getLogger() log.addHandler(logging.StreamHandler()) log.setLevel(logging.DEBUG) archive_mimes = ('application/zip', 'application/x-tar', 'application/rar') if magic: mime = magic.Magic(mime=True) mimetype = mime.from_file(args.file) else: mimes = MimeTypes() mimetype, _ = mimes.guess_type(args.file) with SAVAPIClient(args.host, args.port) as savapi: r = savapi.command('SET', 'PRODUCT', '10776') log.info(r) if mimetype in archive_mimes: r = savapi.command('SET', 'ARCHIVE_SCAN', '1') log.info(r) for r in savapi.scan(args.file): print('{} {} <<< {}'.format(r.code, r.definition, r.data)) if int(r.code) in [319, 350]: savapi.command('QUIT')
def __init__(self): magic.Magic.__init__(self, mime=True, mime_encoding=False, keep_going=False)
def get(self, request, *args, **kwargs): # We get file last version try: version = FileVersion.objects\ .filter(file_id=self.object.id)\ .latest('created_at') mime = magic.Magic(mime=True, magic_file=settings.MAGIC_BIN) mime_type = mime.from_file(version.data.path) response = HttpResponse(version.data.read(), content_type=mime_type) return response except FileVersion.DoesNotExist: raise Http404
def clean_data(self): file = self.cleaned_data['data'] # We ensure file have correct mime type allowed_types = self.folder.allowed_types.all() mime = magic.Magic(mime=True, magic_file=settings.MAGIC_BIN) if mime.from_file(file.temporary_file_path()) not in [type.mime_type for type in allowed_types]: raise forms.ValidationError("Ce type de fichier n'est pas autorisé") if file is not None and file.size > settings.PORTAILVA_APP['file']['file_max_size']: raise forms.ValidationError("Votre fichier est trop lourd, la limite autorisée est de " + str(settings.PORTAILVA_APP['file']['file_max_size'] // (1024 * 1024)) + "Mo") return file
def get_file_encoding(file_path): """Return encoding for file path. :param file_path: Path to file :type file_path: str :returns: encoding :rtype: str """ magic_instance = magic.Magic(mime_encoding=True) encoding = magic_instance.from_file(file_path) return encoding
def __init__(self, robotname, session, datalayer): self.robotname = robotname self.session = session self.datalayer = datalayer self.max_tries = config.read('Robots', 'MaxTries') self.in_progress = set() self.magic = magic.Magic(flags=magic.MAGIC_MIME_TYPE) self.robotslog = config.read('Logging', 'Robotslog') if self.robotslog: self.robotslogfd = open(self.robotslog, 'a') else: self.robotslogfd = None
def test_magic(): ''' The python filemagic package requires the OS libmagic package, so let's test it to be sure nothing's missing ''' with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m: pdf_string = '%PDF-1.3\n' assert m.id_buffer(pdf_string) == 'application/pdf' html_string = '<html>\n' assert m.id_buffer(html_string) == 'text/html'
def get_mime_type(filename): """ Returns the MIME type associated with a particular audio file. """ try: import magic except ImportError: if get_mime_type.warn: logger.warning('Python package "magic" could not be loaded, ' 'possibly because system library "libmagic" could not be ' 'found. We are falling back on our own heuristics.') get_mime_type.warn = False ext = os.path.splitext(filename)[1].lower() return { '.wav' : 'audio/x-wav', '.mp3' : 'audio/mpeg', '.flac' : 'audio/x-flac' }.get(ext, 'unknown') else: # Read off magic numbers and return MIME types mime_magic = magic.Magic(mime=True) ftype = mime_magic.from_file(filename) if isinstance(ftype, bytes): ftype = ftype.decode('utf-8') # If we are dealing with a symlink, read the link # and try again with the target file. We do this in # a while loop to cover the case of symlinks which # point to other symlinks current_filename = filename while ftype == 'inode/symlink': current_filename = os.readlink(current_filename) ftype = mime_magic.from_file(current_filename) ftype = ftype.decode('utf-8') if isinstance(ftype, bytes) else ftype return ftype
def get(self, request, key=None, **kwargs): image_fp = _get_image_fp(key) magic = Magic(mime=True) content_type = magic.from_buffer(image_fp.read(1024)) image_fp.seek(0) return HttpResponse(image_fp, content_type=content_type)
def __init__(self, **attributes): if attributes.get('json', False): attributes = attributes['json'] self.dataType = attributes.get('dataType', None) self.message = attributes.get('message', None) self.tlp = attributes.get('tlp', 2) self.tags = attributes.get('tags', []) self.ioc = attributes.get('ioc', False) data = attributes.get('data', []) if self.dataType == 'file': self.data = [{'attachment': (os.path.basename(data[0]), open(data[0], 'rb'), magic.Magic(mime=True).from_file(data[0]))}] else: self.data = data
def _prepare_file_data(self, file_path): with open(file_path, "rb") as file_artifact: filename = os.path.basename(file_path) mime = magic.Magic(mime=True).from_file(file_path) encoded_string = base64.b64encode(file_artifact.read()) return "{};{};{}".format(filename, mime, encoded_string.decode())
def run_restore(self, args): """Run the restore operation.""" if os.path.isdir(args.infile): self.logger.info('Setting up PngDirImporter') importer = PngDirImporter(args.infile, args.fnamepattern) else: m = magic.Magic(mime=True, uncompress=True) ftype = m.from_file(args.infile) if ftype == "image/png": importer = ImageImporter(args.infile) else: raise Exception('Could not detect import type of file %s' % args.infile) decodefunc = base64.b85decode #FIXME: add arg with open(args.outfile, "wb") as outfile: for image in importer: encdatalist = zbarlight.scan_codes('qrcode', image) for encdata in encdatalist: frame = decodefunc(encdata) bindata, position = self.unframe_data(frame) outfile.seek(position) outfile.write(bindata) self.logger.info('Finished importing') # Cant calculate during writing because we may be reading pieces # out of order if args.sha256: hashfunc = hashlib.sha256() with open(args.outfile, "rb") as outfile: while True: data = outfile.read() if not data: break hashfunc.update(data) print('SHA-256 of output: %s' % hashfunc.hexdigest())
def mimeFrom(self,path): mime = magic.Magic(mime=True) return mime.from_file(path)
def process(self): m = fmagic.Magic(mime=True) return m.from_buffer(self.buffer)
def upload(self, filename, configuration=None, metadata=None, transcript=None): """ Upload new new media to the service as an attachment or from a url. HTTP POST on /media :param filename: Media file attached to the request. :param configuration: VoicebaseMediaConfiguration :param metadata: VoicebaseMediaMeta :param transcript: attached transcript :return: VoicebaseMedia """ data = {} if metadata: data['metadata'] = str(metadata) if configuration: data['configuration'] = str(configuration) # Determine mime type m = magic.Magic(mime=True) mime_type = m.from_file(filename) # Open file and pipe to request with open(filename) as handle: file_info = [('media', (filename, handle, mime_type))] rq = requests.Request(b'POST', self.full_url('base'), data=data, headers=self.session.headers, files=file_info) prepared_rq = rq.prepare() response = self.session.send(prepared_rq) response.raise_for_status() jsn = response.json() log.debug('Upload response: {}'.format(jsn)) return VoicebaseMedia(jsn, api=self.api)
def _get_file_type(full_targ_path): # This function takes the full path of a target sample and determines/returns the file type via python-magic. try: #magicObj = magic.open(magic.MAGIC_NONE) #magicObj.load() #magic_out = str(magicObj.file(full_targ_path)) magicObj = magic.Magic(magic_file=r'C:/Program Files (x86)/GnuWin32/share/misc/magic', mime=True) magic_out = str(magicObj.from_file(full_targ_path)) print magic_out except AttributeError: magic_out = str(magic.from_file(full_targ_path)) print magic_out+" ERROR?!?!?!!?" return(magic_out)
def contenttype(payload): mime = magic.Magic(mime=True) return mime.from_buffer(payload)
def _importMagic(): if config.enable_magic: try: import magic return magic.Magic(magic_file=config.magic_file, mime=True) l.debug("libmagic enabled.") except: l.debug("libmagic cannot be imported") return None
def from_file(fname, uncompress=False): if native: return ms.file(fname) else: m = magic.Magic(uncompress) return m.from_file(fname)
def from_buffer(data, uncompress=False): if native: return ms.buffer(data) else: m = magic.Magic(uncompress) return m.from_buffer(data)
def guess_file_type(self, path): if not hasattr(self, '_mimedb'): self._mimedb = magic.Magic() return maybe_decode(self._mimedb.from_file(path))
def guess_encoding(self, path): if not hasattr(self, '_mimedb_encoding'): self._mimedb_encoding = magic.Magic(mime_encoding=True) return maybe_decode(self._mimedb_encoding.from_file(path))
def get_mime(self): try: ms = magic.open(magic.MIME) ms.load() mime_type = ms.file(self.path) except: try: mime = magic.Magic(mime=True) mime_type = mime.from_file(self.path) except: return '' return mime_type
def handle(self, *args, **options): # Need an absolute path for the target in order to simplify # symlink creation. target_path = options['target'][0] target_path = os.path.realpath(target_path) # Check that the target actually exists if not os.path.isfile(target_path): raise CommandError('Target %s does not exist' % target_path) default_magic = Magic() magic_checks = [ (Magic(magic_file=CGC_MAGIC), CGC_REGEX, CGCProjectConfiguration, 'i386'), (default_magic, ELF32_REGEX, LinuxProjectConfiguration, 'i386'), (default_magic, ELF64_REGEX, LinuxProjectConfiguration, 'x86_64'), (default_magic, DLL32_REGEX, WindowsDLLProjectConfiguration, 'i386'), (default_magic, DLL64_REGEX, WindowsDLLProjectConfiguration, 'x86_64'), (default_magic, PE32_REGEX, WindowsProjectConfiguration, 'i386'), (default_magic, PE64_REGEX, WindowsProjectConfiguration, 'x86_64'), (default_magic, MSDOS_REGEX, WindowsProjectConfiguration, 'i386') ] # Check the target program against the valid file types for magic_check, regex, proj_config_class, arch in magic_checks: magic = magic_check.from_file(target_path) matches = regex.match(magic) # If we find a match, create that project. The user instructions # are returned if matches: options['target'] = target_path options['target_arch'] = arch return call_command(Project(proj_config_class), **options) # Otherwise no valid file type was found raise CommandError('%s is not a valid target for S2E analysis' % target_path)
def __init__(self): self.oMagic=magic.Magic(magic_file=r'C:\Program Files (x86)\GnuWin32\share\misc\magic') filename = os.path.join(os.path.dirname(sys.argv[0]), 'file-magic.def') if os.path.isfile(filename): # self.oMagicCustom=magic.Magic(magic_file=r'mymagic', keep_going=True) self.oMagicCustom = magic.Magic(magic_file=filename) else: print('Warning: custom magic file not found: %s' % filename) self.oMagicCustom = None
def validate_video(value): if os.name == 'nt': magic_object = magic.Magic(magic_file='magic', mime=True) mime = magic_object.from_buffer(value.file.read(1024)) else: mime = magic.from_buffer(value.file.read(1024), mime=True) value.file.seek(0) if not mime.startswith('video/'): raise ValidationError(_(app_settings.INVALID_VIDEO_ERROR_MESSAGE))
def get_files_types(self): """ Return the files inside the APK with their associated types (by using python-magic) :rtype: a dictionnary """ try: import magic except ImportError: # no lib magic ! for i in self.get_files(): buffer = self.zip.read(i) self.files_crc32[i] = crc32(buffer) self.files[i] = "Unknown" return self.files if self.files != {}: return self.files builtin_magic = 0 try: getattr(magic, "MagicException") except AttributeError: builtin_magic = 1 if builtin_magic: ms = magic.open(magic.MAGIC_NONE) ms.load() for i in self.get_files(): buffer = self.zip.read(i) self.files[i] = ms.buffer(buffer) self.files[i] = self._patch_magic(buffer, self.files[i]) self.files_crc32[i] = crc32(buffer) else: m = magic.Magic(magic_file=self.magic_file) for i in self.get_files(): buffer = self.zip.read(i) self.files[i] = m.from_buffer(buffer) self.files[i] = self._patch_magic(buffer, self.files[i]) self.files_crc32[i] = crc32(buffer) return self.files
def get_files_types(self): """ Return the files inside the APK with their associated types (by using python-magic) :rtype: a dictionnary """ try: import magic except ImportError: # no lib magic ! for i in self.get_files(): buffer = self.zip.read(i) self.files_crc32[i] = crc32(buffer) self.files[i] = "Unknown" return self.files if self.files != {}: return self.files builtin_magic = 0 try: getattr(magic, "MagicException") except AttributeError: builtin_magic = 1 if builtin_magic: ms = magic.open(magic.MAGIC_NONE) ms.load() for i in self.get_files(): buffer = self.zip.read(i) self.files[i] = ms.buffer(buffer) if self.files[i] is None: self.files[i] = "Unknown" else: self.files[i] = self._patch_magic(buffer, self.files[i]) self.files_crc32[i] = crc32(buffer) else: m = magic.Magic(magic_file=self.magic_file) for i in self.get_files(): buffer = self.zip.read(i) self.files[i] = m.from_buffer(buffer) if self.files[i] is None: self.files[i] = "Unknown" else: self.files[i] = self._patch_magic(buffer, self.files[i]) self.files_crc32[i] = crc32(buffer) return self.files
def post(self): """ Recieves file and options, checks file mimetype, validates options and creates converting task """ if "file" not in request.files: return abort(400, message="file field is required") else: with NamedTemporaryFile(delete=False, prefix=app.config["MEDIA_PATH"]) as tmp_file: request.files["file"].save(tmp_file) remove_file.schedule( datetime.timedelta(seconds=app.config["ORIGINAL_FILE_TTL"]) , tmp_file.name) with Magic() as magic: # detect mimetype mimetype = magic.from_file(tmp_file.name) if mimetype not in app.config["SUPPORTED_MIMETYPES"]: return abort(400, message="Not supported mimetype: '{0}'".format(mimetype)) options = request.form.get("options", None) if options: # options validation options = ujson.loads(options) formats = options.get("formats", None) if not isinstance(formats, list) or not formats: return abort(400, message="Invalid 'formats' value") else: for fmt in formats: supported = (fmt in app.config["SUPPORTED_MIMETYPES"][mimetype]["formats"]) if not supported: message = "'{0}' mimetype can't be converted to '{1}'" return abort(400, message=message.format(mimetype, fmt)) thumbnails = options.get("thumbnails", None) if thumbnails: if not isinstance(thumbnails, dict): return abort(400, message="Invalid 'thumbnails' value") else: thumbnails_size = thumbnails.get("size", None) if not isinstance(thumbnails_size, str) or not thumbnails_size: return abort(400, message="Invalid 'size' value") else: try: (width, height) = map(int, thumbnails_size.split("x")) except ValueError: return abort(400, message="Invalid 'size' value") else: options["thumbnails"]["size"] = (width, height) else: if mimetype == "application/pdf": options = { "formats": ["html"] } else: options = app.config["DEFAULT_OPTIONS"] task = process_document.queue(tmp_file.name, options, { "mimetype": mimetype, }) return { "id": task.id, "status": task.status, }
def browse(root): """ Lists root, files, and file names in root folder Args: directory (str): Directory to list files for """ path = os.path.join(_AEON_TOPDIR, root) if not os.access(path, os.R_OK): flash('Access Denied reading {path}'.format(path=path), 'warning') root = '/' if root == '/': dirnames = [] for path in valid_paths(): dirnames.append(path.strip('/')) parent = '/' filenames = [] dirpath = '/' return render_template('browse.html', dirpath=dirpath, dirnames=dirnames, filenames=filenames, parent=parent) folder = os.walk(path).next() parent = os.path.join(path, os.pardir).split(_AEON_TOPDIR).pop().strip('/') dirpath = folder[0].split(_AEON_TOPDIR).pop().strip('/') dirnames = [x.split(_AEON_TOPDIR).pop() for x in folder[1]] mime = magic.Magic(mime=True) files = [] for filename in folder[2]: f = os.path.join(_AEON_TOPDIR, root, filename) # If it's a symlink we want to follow it for accurate stats # if os.path.islink(f): # # works for absolute paths # # TODO: fix for relative symlink # f = os.readlink(f) stat = os.stat(f) mimetype = mime.from_file(f) # icon = get_icon(mimetype) size = stat.st_size mtime = stat.st_mtime ctime = stat.st_ctime name = filename files.append({'name': name, 'size': size, 'mtime': mtime, 'ctime': ctime, 'mimetype': mimetype, 'link': f}) return render_template('browse.html', dirpath=dirpath, dirnames=dirnames, files=files, parent=parent) # TODO: constraint to POST or DELETE
def create_sym_hash(filename=None, data=None): # create the sym hash if filename: with open(filename, 'rb') as f: data = f.read() if not data: return with magic.Magic() as m: filetype = m.id_buffer(data[0:1000]) if 'Mach-O' not in filetype: print("Data provided is not a valid Mach-O filetype") return macho_parser = MachOParser(data) try: macho_parser.parse() except MachOParserError as e: print("Error {}".format(e)) return sym_dict = {} for entity in macho_parser.entities: if entity.magic_str != 'Universal': entity_string = "{} {} {}".format(entity.cpu_type_str, entity.filetype_str, entity.magic_str) sym_list = [] for cmd in entity.cmdlist: if cmd['cmd'] == MachOEntity.LC_SYMTAB: for sym in cmd['symbols']: if not sym['is_stab']: if sym['external'] is True: if sym['n_type'] == '0x00': sym_list.append(sym.get('string', '').decode()) symhash = md5(','.join(sorted(sym_list)).encode()).hexdigest() sym_dict[entity_string] = symhash return sym_dict