我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用magic.from_file()。
def _load_index(self): index_path = self.index_path() if not os.path.exists(index_path): return {} content_type = magic.from_file(index_path, mime=True) if content_type == 'text/plain': logger.debug('Detected plaintext encoding for reading index') method = open elif content_type in ('application/gzip', 'application/x-gzip'): logger.debug('Detected gzip encoding for reading index') method = gzip.open else: raise ValueError('Index is of unknown type', content_type) with method(index_path, 'rt') as fp: data = json.load(fp) return data
def get_plaintext_document_body(fpath, keep_layout=False): """Given a file-path to a full-text, return a list of unicode strings whereby each string is a line of the fulltext. In the case of a plain-text document, this simply means reading the contents in from the file. In the case of a PDF however, this means converting the document to plaintext. It raises UnknownDocumentTypeError if the document is not a PDF or plain text. @param fpath: (string) - the path to the fulltext file @return: (list) of strings - each string being a line in the document. """ textbody = [] mime_type = magic.from_file(fpath, mime=True) if mime_type == "text/plain": with open(fpath, "r") as f: textbody = [line.decode("utf-8") for line in f.readlines()] elif mime_type == "application/pdf": textbody = convert_PDF_to_plaintext(fpath, keep_layout) else: raise UnknownDocumentTypeError(mime_type) return textbody
def __init__(self, filename): """ Creates a file object for a malware sample. :param filename: The file name of the available malware sample. """ if not os.path.exists(filename): raise ValueError("File {0} does not exist!".format(filename)) # Default settings of members self.running_entropy_data = None self.running_entropy_window_size = 0 self.file_size = 0 self.parsedfile = None # Fill out other data here... self.filename = filename self.data = list() self.filetype = magic.from_file(self.filename) self._read_file() self._parse_file_type()
def get_type(self): try: ms = magic.open(magic.MAGIC_NONE) ms.load() file_type = ms.file(self.path) except: try: file_type = magic.from_file(self.path) except: try: import subprocess file_process = subprocess.Popen(['file', '-b', self.path], stdout = subprocess.PIPE) file_type = file_process.stdout.read().strip() except: return '' finally: try: ms.close() except: pass return file_type
def file_parser(fname, pages=None): if magic.from_file(fname, mime=True) == 'application/pdf': try: text_array = [] d = pdf.Document(fname) for i, p in enumerate(d, start=1): for f in p: for b in f: for l in b: text_array.append(l.text.encode('UTF-8')) if i == pages: # break after x pages break print "Processed %i pages" % (i) return '\n'.join(text_array) except Exception as e: print "PDF Parser Exception: ", e else: try: content = parser.from_file(fname)['content'] return (content or '').encode('UTF-8') except Exception as e: print "File Parser Exception: ", e
def save_file(self, msg, msg_type): path = os.path.join("storage", self.channel_id) if not os.path.exists(path): os.makedirs(path) filename = "%s_%s_%s" % (msg_type, msg['NewMsgId'], int(time.time())) fullpath = os.path.join(path, filename) msg['Text'](fullpath) mime = magic.from_file(fullpath, mime=True) if isinstance(mime, bytes): mime = mime.decode() guess_ext = mimetypes.guess_extension(mime) or ".unknown" if guess_ext == ".unknown": self.logger.warning("File %s with mime %s has no matching extensions.", fullpath, mime) ext = ".jpeg" if mime == "image/jpeg" else guess_ext os.rename(fullpath, "%s%s" % (fullpath, ext)) fullpath = "%s%s" % (fullpath, ext) self.logger.info("File saved from WeChat\nFull path: %s\nMIME: %s", fullpath, mime) return fullpath, mime
def file_magic(in_file): print "\n\t\tFile Type :", magic.from_file(in_file)
def do_sample_type_detect(datafile): """ Checks the datafile type's. """ mtype = magic.from_file(datafile, mime=True) stype = magic.from_file(datafile) return (mtype, stype)
def _process_cache(self, split="\n", rstrip=True): try: ftype = magic.from_file(self.cache, mime=True) except AttributeError: try: mag = magic.open(magic.MAGIC_MIME) mag.load() ftype = mag.file(self.cache) except AttributeError as e: raise RuntimeError('unable to detect cached file type') if PYVERSION < 3: ftype = ftype.decode('utf-8') if ftype.startswith('application/x-gzip') or ftype.startswith('application/gzip'): from csirtg_smrt.decoders.zgzip import get_lines for l in get_lines(self.cache, split=split): yield l return if ftype == "application/zip": from csirtg_smrt.decoders.zzip import get_lines for l in get_lines(self.cache, split=split): yield l return # all others, mostly txt, etc... with open(self.cache) as f: for l in f: yield l
def get_mimetype(f): try: ftype = magic.from_file(f, mime=True) except AttributeError: try: mag = magic.open(magic.MAGIC_MIME) mag.load() ftype = mag.file(f) except AttributeError as e: raise RuntimeError('unable to detect cached file type') if PYVERSION < 3: ftype = ftype.decode('utf-8') return ftype
def preprocess(sample): """Preprocess files after upload. :param sample: :class:`~app.models.Sample` :return: """ hash_path = os.path.join( current_app.config['APP_UPLOADS_SAMPLES'], sample.sha256 ) if zipfile.is_zipfile(hash_path): mt = magic.from_file(hash_path, mime=True) if mt in skip_mimes: return None current_app.log.debug('Extracting {}'.format(hash_path)) zfile = zipfile.ZipFile(hash_path) for zipfo in zfile.namelist(): cfg = current_app.config if zfile.getinfo(zipfo).compress_type == 99: # PK compat. v5.1 pwd = '-p{}'.format(cfg['INFECTED_PASSWD']) with popen('7z', 'e', '-so', pwd, hash_path) as zproc: buf, stderr = zproc.communicate() else: buf = zfile.read(zipfo, pwd=bytes(cfg['INFECTED_PASSWD'], 'utf-8')) digests = get_hashes(buf) hash_path = os.path.join(cfg['APP_UPLOADS_SAMPLES'], digests.sha256) if not os.path.isfile(hash_path): with open(hash_path, 'wb') as wf: wf.write(buf) s = Sample(user_id=sample.user_id, filename=zipfo, parent_id=sample.id, md5=digests.md5, sha1=digests.sha1, sha256=digests.sha256, sha512=digests.sha512, ctph=digests.ctph) db.session.add(s) db.session.commit()
def _check(self, file): """ Run apropriate check based on `file`'s extension and return it, otherwise raise an Error """ if not os.path.exists(file): raise Error("file \"{}\" not found".format(file)) _, extension = os.path.splitext(file) try: check = self.extension_map[extension[1:]] except KeyError: magic_type = magic.from_file(file) for name, cls in self.magic_map.items(): if name in magic_type: check = cls break else: raise Error("unknown file type \"{}\", skipping...".format(file)) try: with open(file) as f: code = f.read() except UnicodeDecodeError: raise Error("file does not seem to contain text, skipping...") # Ensure we don't warn about adding trailing newline try: if code[-1] != '\n': code += '\n' except IndexError: pass return check(code)
def handle(cls, user, club, file): filename = os.urandom(8).encode('hex') temppath = os.path.join('/tmp', filename) file.save(temppath) try: # Don't use mimetypes.guess_type(temppath) -- Faked extensions mime = magic.from_file(temppath, mime=True) if mime not in cls._mimedict: raise UploadNotSupported filename = filename + cls._mimedict[mime] permpath = cls.mk_internal_path(filename) permdir = os.path.dirname(permpath) if not os.path.isdir(permdir): os.makedirs(permdir, 0o755) # resize to 600, 450 cls._thumb(temppath, permpath) fs.watch(permpath) finally: os.remove(temppath) obj = cls.new() obj.club = club obj.uploader = user obj._location = filename obj.mime = mime return obj.create()
def check(filepath): result = magic.from_file(filepath, mime=True) if re.match('application/pdf', result): return True return False
def get_magic(filename): if g_m: return g_m.file(filename) else: return magic.from_file(filename)
def guess_mime_type_from_file_contents(file_path): """ Get type from file magic bytes. """ mt = magic.from_file(file_path, mime=True) if mt: return mt
def _compute_default_properties(self): self['names'] = [os.path.basename(self['filepath'])] self['detailed_type'] = magic.from_file(self['filepath']) self['mime'] = magic.from_file(self['filepath'], mime=True) self['analysis'] = [] # Init antivirus status self['antivirus'] = {} for module in dispatcher.get_antivirus_modules(): self['antivirus'][module.name] = False self._set_type() # Convert mime/types into clearer type
def create_by_old_paste(cls, filehash): filepath = get_file_path(filehash) mimetype = magic.from_file(filepath, mime=True) filestat = os.stat(filepath) size = filestat.st_size rst = cls(filehash, mimetype, size, filehash=filehash) return rst
def create_by_old_paste(cls, filehash, symlink): filepath = get_file_path(filehash) mimetype = magic.from_file(filepath, mime=True) filestat = os.stat(filepath) size = filestat.st_size rst = cls(filehash, mimetype, size, filehash=filehash, symlink=symlink) return rst
def load_pickle(pickle_path, dataset_path): if not os.path.exists(pickle_path): import magic image_files = [] for dir, _, _, in os.walk(dataset_path): filenames = glob.glob( os.path.join(dir, '*.JPEG')) # may be JPEG, depending on your image files image_files.append(filenames) ## use magic to perform a simple check of the images # import magic # for filename in filenames: # if magic.from_file(filename, mime=True) == 'image/jpeg': # image_files.append(filename) # else: # print '%s is not a jpeg!' % filename # print magic.from_file(filename) if len(image_files) > 0: image_files = np.hstack(image_files) dataset_filenames = {'image_path':image_files} pickle.dump( dataset_filenames, open( pickle_path, "wb" ) ) else: dataset_filenames = pickle.load( open( pickle_path, "rb" ) ) return dataset_filenames # return a pd object
def get_executables(files): """ Filters the only executable files from a files array """ exec_files = [] for file in files: if "executable" in magic.from_file(file): exec_files.append(file) return exec_files
def _get_and_cache(file_path, supported_formats): mime_type = from_file(file_path, mime=True) try: fmt = supported_formats[mime_type] MagicCharacterizerMixin._cache[file_path] = fmt return fmt except KeyError: message = '{0} characterized as {1} format, which is not supported' message = message.format(file_path, mime_type) raise UnsupportedFormat(message, http_status_code=500)
def file_info(self, report): info = [] with open(self.filename, 'rb') as f: file = f.read() if report == "output": return "" else: info.append("File: {}".format(self.filename)) info.append("Size: {} bytes".format(os.path.getsize(self.filename))) info.append("Type: {}".format(magic.from_file(self.filename, mime=True))) info.append("MD5: {}".format(hashlib.md5(file).hexdigest())) info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest())) if ssdeep_r: info.append("ssdeep: {}".format(self.get_ssdeep())) return info
def file_info(filename): info = [] with open(filename, 'rb') as f: file = f.read() info.append("File: {}".format(filename)) info.append("Size: {} bytes".format(os.path.getsize(filename))) info.append("Type: {}".format(magic.from_file(filename, mime=True))) info.append("MD5: {}".format(hashlib.md5(file).hexdigest())) info.append("SHA1: {}".format(hashlib.sha1(file).hexdigest())) if ssdeep_r: info.append("ssdeep: {}".format(ssdeep.hash_from_file(filename))) return info
def post_file(): file_uuid = secure_filename(str(uuid.uuid4())) filename = '/tmp/%s' % file_uuid try: file = request.files['file'] except Exception: raise BadRequestException("Not a valid multipart upload form with " "key named file.") if 'Content-Range' in request.headers: # Extract starting byte from Content-Range header string. range_str = request.headers['Content-Range'] start_bytes = int(range_str.split(' ')[1].split('-')[0]) # Append chunk to the file on disk, or create new. with open(filename, 'a') as f: f.seek(start_bytes) f.write(file.stream.read()) else: # This is not a chunked request, so just save the whole file. file.save(filename) # Generate hash of file, and create new, or renew existing db row. file_hashes = get_all_hashes(filename) file_size = os.path.getsize(filename) file_type = magic.from_file(filename, mime=True) file = create_or_renew_by_hash(file_hashes, file_size, file_type) file_id = file.file_id file_dict = file.to_dict() # Upload to swift and remove the local temp file. upload_to_swift(filename, file_uuid) os.remove(filename) # Send message to worker queue with file details. worker_msg = {"file_uuid": file_uuid, "file_id": file_id} submit_worker_notification(worker_msg) return jsonify(file_dict)
def maybe_gunzip(fname, base, ext): if fname and 'gzip' in magic.from_file(fname): start = time.time() print("Gunzip file " + str(fname)) newf = safe_fname(base, ext) sh("gunzip", fname, "-c >", newf) fname = newf print("Gunzip took %g seconds" % (time.time() - start)) return fname
def get_filetype(fpath): """Return a mime-style filetype string.""" return magic.from_file(fpath, mime=True)
def file_is(file_description, fmt): """Get if file stored in `file_path` is a `fmt` document. :file_path: Full path for a `fmt` file or a buffer containing `fmt` data. :returns: True if is `fmt` and False otherwise """ import magic logger.debug("Checking filetype") if isinstance(file_description, str): # This means that the file_description is a string result = re.match( r".*%s.*" % fmt, magic.from_file(file_description, mime=True), re.IGNORECASE ) if result: logger.debug( "File %s appears to be of type %s" % (file_description, fmt) ) elif isinstance(file_description, bytes): # Suppose that file_description is a buffer result = re.match( r".*%s.*" % fmt, magic.from_buffer(file_description, mime=True) ) if result: logger.debug( "Buffer appears to be of type %s" % (fmt) ) return True if result else False
def register_files(self): print("Start registering files") for root, dirs, files in os.walk(self.extracted_path): for file in files: full_path = os.path.join(root, file) if not os.path.isfile(full_path): continue path = full_path.replace(self.extracted_path, "") content = "" hash = "" with open(full_path, "rb") as fd: content = fd.read() hash_content = "%s:%s" % (file, content) hash = hashlib.md5(hash_content.encode('utf-8')).hexdigest() try: file_obj = FileModel.objects.get(hash=hash) file_obj.firmware.add(self.firmware) file_obj.save() except FileModel.DoesNotExist: try: file_obj = FileModel() file_obj.filepath = os.path.join(root, file) file_obj.hash = hash file_obj.filesize = len(content) file_obj.filename = path file_obj.save() file_obj.firmware.add(self.firmware) file_obj.file_type = magic.from_file(os.path.join(root, file)) file_obj.save() self.find_loots(file_obj) # Performance tweak file_obj.nb_loots = file_obj.loots.all().count() except: file_obj.file_type = "unknown" print("Files registered")
def parse_file_info(file_path, dir_path): print("entering parse_file_info") mime_type = magic.from_file(file_path, mime=True) print(mime_type) print(file_path) if mime_type in file_mimetype_relation: return file_mimetype_relation[mime_type](file_path, dir_path) return None
def _get_file_type(full_targ_path): # This function takes the full path of a target sample and determines/returns the file type via python-magic. try: magicObj = magic.open(magic.MAGIC_NONE) magicObj.load() magic_out = str(magicObj.file(full_targ_path)) except AttributeError: magic_out = str(magic.from_file(full_targ_path)) return(magic_out)
def _get_file_type(full_targ_path): # This function takes the full path of a target sample and determines/returns the file type via python-magic. try: #magicObj = magic.open(magic.MAGIC_NONE) #magicObj.load() #magic_out = str(magicObj.file(full_targ_path)) magicObj = magic.Magic(magic_file=r'C:/Program Files (x86)/GnuWin32/share/misc/magic', mime=True) magic_out = str(magicObj.from_file(full_targ_path)) print magic_out except AttributeError: magic_out = str(magic.from_file(full_targ_path)) print magic_out+" ERROR?!?!?!!?" return(magic_out)
def get_type(self): """Get MIME file type. @return: file type. """ file_type = None if HAVE_MAGIC: try: ms = magic.open(magic.MAGIC_SYMLINK) ms.load() file_type = ms.file(self.file_path) except: try: file_type = magic.from_file(self.file_path) except: pass finally: try: ms.close() except: pass if file_type is None: try: p = subprocess.Popen(["file", "-b", "-L", self.file_path], stdout=subprocess.PIPE) file_type = p.stdout.read().strip() except: pass return file_type
def get_content_type(self): """Get MIME content file type (example: image/jpeg). @return: file content type. """ file_type = None if HAVE_MAGIC: try: ms = magic.open(magic.MAGIC_MIME|magic.MAGIC_SYMLINK) ms.load() file_type = ms.file(self.file_path) except: try: file_type = magic.from_file(self.file_path, mime=True) except: pass finally: try: ms.close() except: pass if file_type is None: try: p = subprocess.Popen(["file", "-b", "-L", "--mime-type", self.file_path], stdout=subprocess.PIPE) file_type = p.stdout.read().strip() except: pass return file_type
def processDownload(tmpFilePath, fileName, fileUrl): logging.info('Downloaded as temporary file: {0}. Beginning processing...'.format(tmpFilePath)) fileSize = os.path.getsize(tmpFilePath) >> 20 if (fileSize > 10): logging.error('File is {0}MB. Too large to process.'.format(fileSize)) cleanUp(tmpFilePath) return False fileHash = sha256SumFile(tmpFilePath) if not isAcceptedHash(fileHash): cleanUp(tmpFilePath) return False filePath = os.path.join(baseConfig.outputFolder, fileHash) os.rename(tmpFilePath, filePath) # Trust only the content type of the downloaded file. mimeType = magic.from_file(filePath, mime=True) if mimeType not in ['application/octet-stream', 'application/x-dosexec', 'application/x-msdownload', 'application/x-ms-installer', 'application/pdf', 'application/x-pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/vnd.openxmlformats-officedocument.wordprocessingml.template', 'application/vnd.ms-word.document.macroEnabled', 'application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.openxmlformats-officedocument.spreadsheetml.template', 'application/vnd.ms-excel.sheet.macroEnabled', 'application/vnd.ms-excel.template.macroEnabled', 'application/vnd.ms-excel.addin.macroEnabled', 'application/vnd.ms-excel.sheet.binary.macroEnabled', 'application/x-shockwave-flash']: logging.error('Detected non-binary or executable file type ({0}). Skipping: {1}'.format(mimeType, filePath)) cleanUp(filePath) return False logging.info('File with hash: {0} identified as type: {1}'.format(fileHash, mimeType)) uploaded = uploadToViper(filePath, fileName, fileHash, fileUrl) addToHashCache(fileHash) cleanUp(filePath) return uploaded
def validate_elm_make(ctx, param, value): if value is None: return value realpath = os.path.realpath(value) if not os.path.isfile(realpath): realpath = shutil.which(value) if realpath is None or not os.path.isfile(realpath): raise click.BadParameter('{} not found'.format(value)) elm_make_mimetype = magic.from_file(realpath, mime=True) if not elm_make_mimetype.startswith('text'): return value perhaps_binwrap_of = os.path.normpath( os.path.join( os.path.dirname(realpath), os.pardir, 'elm', 'Elm-Platform', '*', '.cabal-sandbox', 'bin', 'elm-make')) raise click.BadParameter('''should be the real elm-make binary; this looks like a text file. if you installed Elm through npm, then try {}'''.format(perhaps_binwrap_of))
def libmagic_file_type(self): """ Returns: str: The libmagic-parsed file type. """ return magic.from_file(self.path, mime=True)
def get_type(self): """Get MIME file type. @return: file type. """ file_type = None if HAVE_MAGIC: try: ms = magic.open(magic.MAGIC_NONE) ms.load() file_type = ms.file(self.file_path) except: try: file_type = magic.from_file(self.file_path) except Exception as e: log.debug("Error getting magic from file %s: %s", self.file_path, e) finally: try: ms.close() except: pass if file_type is None: try: p = subprocess.Popen(["file", "-b", self.file_path], stdout=subprocess.PIPE) file_type = p.stdout.read().strip() except Exception as e: log.debug("Error running file(1) on %s: %s", self.file_path, e) return file_type
def get_content_type(self): """Get MIME content file type (example: image/jpeg). @return: file content type. """ file_type = None if HAVE_MAGIC: try: ms = magic.open(magic.MAGIC_MIME) ms.load() file_type = ms.file(self.file_path) except: try: file_type = magic.from_file(self.file_path, mime=True) except: pass finally: try: ms.close() except: pass if file_type is None: try: args = ["file", "-b", "--mime-type", self.file_path] file_type = subprocess.check_output(args).strip() except: pass return file_type
def guess_mimetype(path): magic_mimetype = magic.from_file(str(path), mime=True) if magic_mimetype == b"audio/x-m4a": return "audio/mp4" else: return magic_mimetype.decode("utf-8")
def inspect(self, sample): sample.info[self.NAME] = {"magic": magic.from_file(sample.path), "mime": magic.from_file(sample.path, mime = True)}
def get_mime(self): try: ms = magic.open(magic.MIME) ms.load() mime_type = ms.file(self.path) except: try: mime = magic.Magic(mime=True) mime_type = mime.from_file(self.path) except: return '' return mime_type
def mime(self): if hasattr(magic, "from_file"): # Use https://pypi.python.org/pypi/python-magic return magic.from_file(self.fetch('filename'), mime=True) elif hasattr(magic, "open"): # Use the python-magic library in distro repos from the `file` # command - http://www.darwinsys.com/file/ magic_instance = magic.open(magic.MAGIC_MIME) magic_instance.load() return magic_instance.file(self.fetch('filename')) raise ImportError( 'The `magic` module that was found is not the expected pypi ' 'package python-magic (https://pypi.python.org/pypi/python-magic) ' 'nor file\'s (http://www.darwinsys.com/file/) package.')
def create_pads_from_files(job_id, attachment, email, client_id, client_secret): """ For each HTML file in zipped attachment, create a new pad, return the number of created pads """ logging.info("Opening attached zip %s." % attachment) m = re.search('^.+attachments/(.+)\.zip$', attachment) directory = './data/' + m.group(1) unzip_attachment(attachment, directory) files = os.listdir(directory) hackpad = Hackpad(api_scheme = os.getenv('HACKPAD_API_SCHEME') or 'http', api_domain = os.getenv('HACKPAD_API_DOMAIN') or 'hackpad.dev', sub_domain = os.getenv('HACKPAD_SUB_DOMAIN') or '', consumer_key = client_id, consumer_secret = client_secret) pads_created = pads_skipped = 0 for file_name in files: file_path = directory + '/' + file_name # check if it is really an html file file_type = magic.from_file(file_path, mime=True) if file_type != 'text/html': logging.info('Invalid file type for file %s :%s' % (file_path, file_type)) continue fh = open(file_path) logging.info('importing for %s: %s' % (email, file_name)) if insert_pad_from_file(job_id, hackpad, fh, file_name, client_id, client_secret): pads_created += 1 else: pads_skipped += 1 fh.close() # Check if all files are imported if pads_created + pads_skipped != len(files): email_error("Not all files were processed", job_id) return pads_created, pads_skipped
def attachFile(attachList, filename, pos=None, replace=False): """Check a path and add it to the attachment list If pos is given and replace is False, insert attachment at given position. If pos is given and replace is True, replace the attachment at the given position. """ if pos is not None: if pos < 1 or pos > len(attachList): print("Bad position. {} not between 1 and {}".format(pos, len(attachList))) return # Adjust from human position to index pos -= 1 try: st = os.stat(filename) except OSError as err: import errno # Can't read it. Is it because it doesn't exist? if err.errno == errno.ENOENT: print("WARNING: Given file doesn't currently exist. Adding to list anyway. We'll try reading it again when completing the message") else: print("WARNING: Couldn't get information about the file: %s" % err.strerror) print("Adding to list anyway. We'll try reading it again when completing the message.") else: if not os.access(filename, os.R_OK): print("WARNING: Can't read existing file. Adding to list anyway. We'll try again when completing the message.") else: print("Attachment added to list. Raw size is currently %i bytes. Note: we'll actually read the data when completing the message" % st.st_size) mtype = magic.from_file(filename, mime=True) print("Mime type appears to be %s" % mtype) if pos is None: attachList.append(filename) elif replace == False: attachList.insert(pos, filename) else: attachList[pos] = filename
def _download_file(self, tg_msg, file_obj, msg_type): """ Download media file from telegram platform. Args: tg_msg: Telegram message instance file_obj: File object msg_type: Type of message Returns: tuple of str[2]: Full path of the file, MIME type """ path = os.path.join("storage", self.channel_id) if not os.path.exists(path): os.makedirs(path) size = getattr(file_obj, "file_size", None) file_id = file_obj.file_id if size and size > telegram.constants.MAX_FILESIZE_DOWNLOAD: raise EFBMessageError("Attachment is too large. Maximum 20 MB. (AT01)") f = self.bot.bot.getFile(file_id) fname = "%s_%s_%s_%s" % (msg_type, tg_msg.chat.id, tg_msg.message_id, int(time.time())) fullpath = os.path.join(path, fname) f.download(fullpath) mime = getattr(file_obj, "mime_type", magic.from_file(fullpath, mime=True)) if type(mime) is bytes: mime = mime.decode() guess_ext = mimetypes.guess_extension(mime) or ".unknown" if guess_ext == ".unknown": self.logger.warning("File %s with mime %s has no matching extensions.", fullpath, mime) ext = ".jpeg" if mime == "image/jpeg" else guess_ext os.rename(fullpath, "%s%s" % (fullpath, ext)) fullpath = "%s%s" % (fullpath, ext) return fullpath, mime
def _produce_one_sample(self): dirname = os.path.dirname(self.path) if not check_dir(dirname): raise ValueError("Invalid data path.") with open(self.path, 'r') as fid: flist = [l.strip() for l in fid.xreadlines()] if self.shuffle: random.shuffle(flist) input_files = [os.path.join(dirname, 'input', f) for f in flist] output_files = [os.path.join(dirname, 'output', f) for f in flist] self.nsamples = len(input_files) input_queue, output_queue = tf.train.slice_input_producer( [input_files, output_files], shuffle=self.shuffle, seed=0123, num_epochs=self.num_epochs) if '16-bit' in magic.from_file(input_files[0]): input_dtype = tf.uint16 input_wl = 65535.0 else: input_wl = 255.0 input_dtype = tf.uint8 if '16-bit' in magic.from_file(output_files[0]): output_dtype = tf.uint16 output_wl = 65535.0 else: output_wl = 255.0 output_dtype = tf.uint8 input_file = tf.read_file(input_queue) output_file = tf.read_file(output_queue) if os.path.splitext(input_files[0])[-1] == '.jpg': im_input = tf.image.decode_jpeg(input_file, channels=3) else: im_input = tf.image.decode_png(input_file, dtype=input_dtype, channels=3) if os.path.splitext(output_files[0])[-1] == '.jpg': im_output = tf.image.decode_jpeg(output_file, channels=3) else: im_output = tf.image.decode_png(output_file, dtype=output_dtype, channels=3) # normalize input/output sample = {} with tf.name_scope('normalize_images'): im_input = tf.to_float(im_input)/input_wl im_output = tf.to_float(im_output)/output_wl inout = tf.concat([im_input, im_output], 2) fullres, inout = self._augment_data(inout, 6) sample['lowres_input'] = inout[:, :, :3] sample['lowres_output'] = inout[:, :, 3:] sample['image_input'] = fullres[:, :, :3] sample['image_output'] = fullres[:, :, 3:] return sample