我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.scandir()。
def generate(self): """ Main method of this class it walks in a directory and gets txt files to process :return: string name of created arff file. """ folders = [f.path for f in os.scandir(self.folderpath) if f.is_dir()] for folder in folders: # get txt files from folder path files = [f.path for f in os.scandir(folder) if f.name.endswith(".txt")] self.empty_counter() for f_name in files: self.totalCounter = self.count_words(f_name) self.counterList[f_name] = self.totalCounter arff_file = self.create_arff() # generate file return arff_file
def __init__(self, dirName): """ Args: dirName (string): directory where to load the corpus """ self.MAX_NUMBER_SUBDIR = 10 self.conversations = [] __dir = os.path.join(dirName, "dialogs") number_subdir = 0 for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))): if number_subdir == self.MAX_NUMBER_SUBDIR: print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR)) return if sub.is_dir(): number_subdir += 1 for f in os.scandir(sub.path): if f.name.endswith(".tsv"): self.conversations.append({"lines": self.loadLines(f.path)})
def _do_tree(root_src, root_dest, tmpl_dict, tmpl_ext, tag_delim, level=0): if level == 0: _mkdir(root_dest) for entry in os.scandir(root_src): src_path = os.path.join(root_src, entry.name) dest_path = os.path.join(root_dest, do_text(entry.name, tmpl_dict, tag_delim)) if entry.is_dir(): _mkdir(dest_path, copy_stats_from=src_path) _do_tree(src_path, dest_path, tmpl_dict, tmpl_ext, tag_delim, level + 1) elif entry.is_file(): was_tmpl = False for ext in tmpl_ext: ext = ext.lower() if entry.name.lower().endswith(ext): was_tmpl = True dest_path = dest_path[0:-len(ext)] do_file(src_path, dest_path, tmpl_dict, tag_delim) break if not was_tmpl: shutil.copy2(src_path, dest_path, follow_symlinks=False)
def extract_features(feature_extraction, save_dir, data_dir=DATA_DIR, extension=".cell", model_name=""): """ For all of the files in the in the `data_dir` with the `extension` extension, it extracts the features using the `feature_extraction` function. @param feature_extraction is a function that takes a trace as an input and returns a list of features (1D) @param save_dir is the directory where you save the features for the traces. Every in this dir is called `{website}-{id}.cellf` with both `website` and `id` being integers @param data_dir is the absolute path to the data directory @param extension is the extension of the files that contain the raw traces @param model_name is used for printing for what model we are extracting features """ paths = [] for i, f in enumerate(scandir(data_dir)): if f.is_file() and f.name[-len(extension):] == extension: paths.append(f.path) extract_features_from_files(feature_extraction, paths, save_dir, extension=extension, model_name=model_name)
def __load_mergers(self): mergers = {} for f in os.scandir(os.path.join( os.path.dirname(os.path.realpath(__file__)), "mergers")): if f.is_file() and f.name.endswith(".py"): name = f.name[:-3] mod = __import__("mergers.%s" % name, fromlist=["Merger"]) try: m = getattr(mod, "Merger") if issubclass(m, BaseMerger): merger = m() logging.debug("Found merger for %s", \ merger.get_supported_software().keys()) for software in merger.get_supported_software(): if software in mergers: mergers[software].append(merger) else: mergers[software] = [ merger ] except AttributeError: logging.warning("Merger %s found but doesn't implement a Merger class inheriting from BaseMerger", name) return mergers
def read_files_worker(self, directory, queue): """ Read all files in a directory and output to the queue. First line of every file should contain the index. Worker separates first line and parses to dict. Tuple of index and text is added to queue. :directory: Source directory containing files :queue: Queue to add the tuples to """ for file in os.scandir(directory): if file.is_file(): with open(file.path, 'r', errors='replace') as f: text = f.readlines() try: index = literal_eval(text.pop(0).strip()) queue.put((index, '\n'.join(text)), block=True) except IndexError: LOGGER.error('File {0} is not classifyable' .format(file.path)) LOGGER.info('File reading worker done.')
def rcollect(path, depth, filter=None): filter = filter or (lambda n: not n.startswith('.')) path = os.path.expanduser(path) if os.path.exists(path): for f in os.scandir(path): if filter(f.name): t = 'undefined' try: t = 'file' if f.is_file() else 'dir' if f.is_dir() else 'undefined' except OSError: pass if t == 'file': yield f elif t == 'dir' and depth > 0: for e in rcollect(f.path, depth - 1, filter): yield e
def get_new_venv_name(count=1): if not os.path.exists(get_venv_dir()): # no cov if count == 1: return get_random_venv_name() else: return sorted(get_random_venv_name() for _ in range(count)) current_venvs = set(p.name for p in os.scandir(get_venv_dir())) new_venvs = set() while len(new_venvs) < count: name = get_random_venv_name() while name in current_venvs or name in new_venvs: # no cov name = get_random_venv_name() new_venvs.add(name) return new_venvs.pop() if count == 1 else sorted(new_venvs)
def organize_junk(): for entry in os.scandir(): if entry.is_dir(): continue file_path = Path(entry.name) file_format = file_path.suffix.lower() if file_format in FILE_FORMATS: directory_path = Path(FILE_FORMATS[file_format]) directory_path.mkdir(exist_ok=True) file_path.rename(directory_path.joinpath(file_path)) try: os.mkdir("OTHER-FILES") except: pass for dir in os.scandir(): try: if dir.is_dir(): os.rmdir(dir) else: os.rename(os.getcwd() + '/' + str(Path(dir)), os.getcwd() + '/OTHER-FILES/' + str(Path(dir))) except: pass
def _iter_items(self, path: str) -> AsyncIterable: with os.scandir(path) as directory: for item in directory: current_level = self._get_depth(item.path) # happy scenario, we are in the exact level of requested # so return whatever it is, a folder, a file or a symlink. if current_level == self._level: yield item continue # we did't reach the requested sub level yet # so send recursive _scan inside if this is a folder. elif current_level < self._level and item.is_dir(): async for e in self._iter_items(item.path): yield e # and ignore any other scenario including ignoring files and # symlinks, if the level is not reached yet. continue
def data_reader(input_dir, shuffle=True): """Read images from input_dir then shuffle them Args: input_dir: string, path of input dir, e.g., /path/to/dir Returns: file_paths: list of strings """ file_paths = [] for img_file in scandir(input_dir): if img_file.name.endswith('.jpg') and img_file.is_file(): file_paths.append(img_file.path) if shuffle: # Shuffle the ordering of all image files in order to guarantee # random ordering of the images with respect to label in the # saved TFRecord files. Make the randomization repeatable. shuffled_index = list(range(len(file_paths))) random.seed(12345) random.shuffle(shuffled_index) file_paths = [file_paths[i] for i in shuffled_index] return file_paths
def traverse(path, ignore_files=None): if not os.path.exists(path): return if ignore_files is None: ignore_files = [] for item in scandir(path): if any(fnmatch.fnmatch(item.name, pattern) for pattern in ignore_files): logger.debug('Ignoring %s', item) continue if item.is_dir(): for result in traverse(item.path, ignore_files): yield os.path.join(item.name, result) else: yield item.name
def get_sources_from_files(path): def order(fname): if 'bf' in fname: return 0 if 'lamin' in fname: return 1 if 'fibrillarin' in fname: return 2 if 'tom' in fname: return 3 if 'all' in fname: return 10 return 0 files = [i.path for i in os.scandir(path) if i.is_file()] paths_sources = [i for i in files if (('rgb.tif' in i) or ('bf.tif' in i))] paths_sources.sort(key=order) sources = [] for path in paths_sources: source = tifffile.imread(path) sources.append(source) return sources
def save_backup(path_file, n_backups=5): if not os.path.exists(path_file): return path_dir, path_base = os.path.split(path_file) path_backup_dir = os.path.join(path_dir, 'backups') if not os.path.exists(path_backup_dir): os.makedirs(path_backup_dir) paths_existing_backups = [i.path for i in os.scandir(path_backup_dir) if (path_base in i.path and i.path.split('.')[-1].isdigit())] paths_existing_backups.sort(key=lambda x: os.path.getmtime(x)) tag = 0 if len(paths_existing_backups) > 0: tag = (int(paths_existing_backups[-1].split('.')[-1]) + 1) % 100 paths_delete = paths_existing_backups[:-(n_backups - 1)] if n_backups > 1 else paths_existing_backups for path in paths_delete: os.remove(path) path_backup = os.path.join(path_backup_dir, path_base + '.{:02}'.format(tag)) shutil.copyfile(path_file, path_backup) print('wrote to:', path_backup)
def find_source_dirs(path_root_dir): """Find source directories to make layouts, going at most 1 layer deep. Returns : list of source directories """ def is_source_dir(path): if not os.path.isdir(path): return False has_signal, has_target, has_prediction = False, False, False for entry in [i.path for i in os.scandir(path) if i.is_file()]: if any(tag in entry for tag in TAGS_SIGNAL): has_signal = True if any(tag in entry for tag in TAGS_TARGET): has_target = True if any(tag in entry for tag in TAGS_PREDICTION): has_prediction = True return has_signal and has_target and has_prediction if is_source_dir(path_root_dir): return [path_root_dir] results = [] for entry in os.scandir(path_root_dir): if is_source_dir(entry.path): results.append(entry.path) return results
def scan_dir(self, path): for entry in os.scandir(path): if entry.name.startswith('.'): continue if entry.is_dir(): self.scan_dir(entry.path) continue name, ext = os.path.splitext(entry.name) if ext.lower() in IMAGE_EXTS: self.add_choice(path) break if ext.lower() in ARCHIVE_EXTS: self.add_choice(entry.path)
def make_custom_check_bins_package(source_dir, package_filename): with gen.util.pkgpanda_package_tmpdir() as tmpdir: tmp_source_dir = os.path.join(tmpdir, 'check_bins') shutil.copytree(source_dir, tmp_source_dir) # Apply permissions for entry in os.scandir(tmp_source_dir): # source_dir should have no subdirs. assert entry.is_file() os.chmod(entry.path, 0o755) # Add an empty pkginfo.json. pkginfo_filename = os.path.join(tmp_source_dir, 'pkginfo.json') assert not os.path.isfile(pkginfo_filename) with open(pkginfo_filename, 'w') as f: f.write('{}') os.chmod(pkginfo_filename, 0o644) gen.util.make_pkgpanda_package(tmp_source_dir, package_filename)
def scantree(path_name, skip_list=None): """This function returns the files present in path_name, including the files present in subfolders. Implementation uses scandir, if available, as it is faster than os.walk""" if skip_list is None: skip_list = DEFAULT_SKIP_LIST try: for entry in (e for e in scandir(path_name) if not is_ignored(e.path, skip_list)): if entry.is_dir(follow_symlinks=False): yield from scantree(entry.path, skip_list) else: yield entry.path except PermissionError: yield 'PermissionError reading {}'.format(path_name)
def process_poetry(self, data_dir='/media/pony/DLdigest/data/languageModel/chinese-poetry/json'): save_dir = os.path.join(self.save_dir, 'poem') check_path_exists(save_dir) count = 0 for entry in os.scandir(data_dir): if entry.name.startswith('poet'): with open(entry.path, 'r') as json_file: poems = json.load(json_file) for p in poems: paras = HanziConv.toSimplified(''.join(p['paragraphs']).replace('\n', '')) paras = filter_punctuation(paras) for para in paras.split(' '): if len(para.strip())>1: pys = ' '.join(np.array(pinyin(para)).flatten()) with open(os.path.join(save_dir, str(count//400000+1)+'.txt'), 'a') as f: f.write(para+','+pys+'\n') count += 1
def __init__(self, path, memmap_frames=False, verbose=False): self.path = _ospath.abspath(path) self.dir = _ospath.dirname(self.path) base, ext = _ospath.splitext(_ospath.splitext(self.path)[0]) # split two extensions as in .ome.tif base = _re.escape(base) pattern = _re.compile(base + '_(\d*).ome.tif') # This matches the basename + an appendix of the file number entries = [_.path for _ in _os.scandir(self.dir) if _.is_file()] matches = [_re.match(pattern, _) for _ in entries] matches = [_ for _ in matches if _ is not None] paths_indices = [(int(_.group(1)), _.group(0)) for _ in matches] self.paths = [self.path] + [path for index, path in sorted(paths_indices)] self.maps = [TiffMap(path, verbose=verbose) for path in self.paths] self.n_maps = len(self.maps) self.n_frames_per_map = [_.n_frames for _ in self.maps] self.n_frames = sum(self.n_frames_per_map) self.cum_n_frames = _np.insert(_np.cumsum(self.n_frames_per_map), 0, 0) self.dtype = self.maps[0].dtype self.height = self.maps[0].height self.width = self.maps[0].width self.shape = (self.n_frames, self.height, self.width)
def hashsite(sitepath): hash = hashlib.sha256() def hashdir(dirpath, is_home): for entry in os.scandir(dirpath): if entry.is_file(): if entry.name.endswith('~'): continue mtime = os.path.getmtime(entry.path) hash.update(str(mtime).encode()) hash.update(entry.name.encode()) if entry.is_dir(): if is_home and entry.name == 'out': continue hashdir(entry.path, False) hashdir(sitepath, True) return hash.digest()
def profiles_last_modified(self, slicer): """ Retrieves the last modification date of ``slicer``'s profiles. Args: slicer (str): the slicer for which to retrieve the last modification date Returns: (float) the time stamp of the last modification of the slicer's profiles """ if not slicer in self.registered_slicers: raise UnknownSlicer(slicer) slicer_profile_path = self.get_slicer_profile_path(slicer) lms = [os.stat(slicer_profile_path).st_mtime] lms += [os.stat(entry.path).st_mtime for entry in scandir(slicer_profile_path) if entry.name.endswith(".profile")] return max(lms)
def _analysis_backlog_generator(self, path=None): if path is None: path = self.basefolder metadata = self._get_metadata(path) if not metadata: metadata = dict() for entry in scandir(path): if is_hidden_path(entry.name) or not octoprint.filemanager.valid_file_type(entry.name): continue if entry.is_file(): if not entry.name in metadata or not isinstance(metadata[entry.name], dict) or not "analysis" in metadata[entry.name]: printer_profile_rels = self.get_link(entry.path, "printerprofile") if printer_profile_rels: printer_profile_id = printer_profile_rels[0]["id"] else: printer_profile_id = None yield entry.name, entry.path, printer_profile_id elif os.path.isdir(entry.path): for sub_entry in self._analysis_backlog_generator(entry.path): yield self.join_path(entry.name, sub_entry[0]), sub_entry[1], sub_entry[2]
def remove_folder(self, path, recursive=True): path, name = self.sanitize(path) folder_path = os.path.join(path, name) if not os.path.exists(folder_path): return empty = True for entry in scandir(folder_path): if entry.name == ".metadata.yaml": continue empty = False break if not empty and not recursive: raise StorageError("{name} in {path} is not empty".format(**locals()), code=StorageError.NOT_EMPTY) import shutil shutil.rmtree(folder_path) self._delete_metadata(folder_path)
def my_dir_walker_with_size_counting(topdir=None): if topdir is None: topdir = os.getcwd() sizes = {topdir: 0} # ?cie?ka: rozmiar w bajtach stack = [] def inner_walker(new_topdir): stack.append(new_topdir) new_topdir_size = os.path.join(*stack) entries = os.scandir(new_topdir) size = 0 for entry in entries: if entry.is_dir(follow_symlinks=False): entry_size = inner_walker(entry.name) sizes[os.path.join(*stack)] = entry_size size += entry_size stack.pop() else: fpath = os.path.join(*stack, entry.name) sizes[fpath] = os.path.getsize(fpath) size += os.path.getsize(fpath) return size inner_walker(topdir) return sizes
def my_directory_walker_with_size_counting(topdir=None): if topdir is None: topdir = os.getcwd() sizes = {topdir: 0} root_stack = [] current_root_size = 0 def inner_walker(new_topdir): root_stack.append(new_topdir) new_topdir_path = os.path.join(*root_stack) # TODO: PRACA DOMOWA: doda? obs?ug? b??dów entries = os.scandir(new_topdir_path) size = 0 for entry in entries: if entry.is_dir(follow_symlinks=False): entry_size = inner_walker(entry.name) sizes[os.path.join(*root_stack)] = entry_size size += entry_size root_stack.pop() elif entry.is_file(follow_symlinks=False): sizes[os.path.join(*root_stack, entry.name)] = entry.stat().st_size size += entry.stat().st_size # os.path.getsize return size inner_walker(topdir) return sizes
def clean_unused(): """ A function to clear unreferenced media files. """ if not hasattr(cache, 'delete_pattern'): # Abort if cache backend is not redis warnings.warn( 'Unused files clearing aborted due to bad cache backend settings.') return _resolve_referenced_files(_fields_to_search()) with os.scandir(settings.MEDIA_ROOT) as iterator: for entry in iterator: name = entry.name if not entry.is_file() or\ cache.get(_make_key(name)) is not None: continue default_storage.delete(name)
def get_random_image(img_dir): """Pick a random image file from a directory.""" current_wall = wallpaper.get() current_wall = os.path.basename(current_wall) file_types = (".png", ".jpg", ".jpeg", ".jpe", ".gif", ".PNG", ".JPG", ".JPEG", ".JPE", ".GIF") images = [img for img in os.scandir(img_dir) if img.name.endswith(file_types) and img.name != current_wall] if not images: print("image: No new images found (nothing to do), exiting...") sys.exit(1) return os.path.join(img_dir, random.choice(images).name)
def get_list(path): for file in os.scandir(path): stat = file.stat() perms = filemode(stat.st_mode) nlinks = stat.st_nlink if not nlinks: nlinks = 1 size = stat.st_size try: uname = pwd.getpwuid(stat.st_uid).pw_name except: uname = 'owner' try: gname = grp.getgrgid(stat.st_gid).gr_name except: gname = 'group' mtime = time.gmtime(stat.st_mtime) mtime = time.strftime("%b %d %H:%M", mtime) mname = file.name yield "{} {} {} {} {} {} {}".format(perms, nlinks, uname, gname, size, mtime, mname)
def test_get_mlsx(): path = os.path.join(os.getcwd(), "tests") result = {} temp = get_mlsx(path) for f in temp: f = f.split(";") filename = f[3].strip() result[filename] = {} for i in range(0, 3): name = f[i].split('=')[0] result[filename][name] = f[i].split('=')[1] files = os.scandir(path) for file in files: assert file.name in result.keys() result_file = result[file.name] stat = file.stat() assert result_file['modify'] == str(time.strftime("%Y%m%d%H%M%S", time.gmtime(stat[8]))) assert result_file['size'] == str(stat[6]) assert result_file['type'] == "dir" if file.is_dir() else "file"
def __init__(self, dirName): """ Args: dirName (string): directory where to load the corpus """ self.MAX_NUMBER_SUBDIR = 200 self.conversations = [] __dir = os.path.join(dirName, "dialogs") number_subdir = 0 for sub in tqdm(os.scandir(__dir), desc="Ubuntu dialogs subfolders", total=len(os.listdir(__dir))): if number_subdir == self.MAX_NUMBER_SUBDIR: print("WARNING: Early stoping, only extracting {} directories".format(self.MAX_NUMBER_SUBDIR)) return if sub.is_dir(): number_subdir += 1 for f in os.scandir(sub.path): if f.name.endswith(".tsv"): self.conversations.append({"lines": self.loadLines(f.path)})
def _osu_files(path, recurse): """An iterator of ``.osu`` filepaths in a directory. Parameters ---------- path : path-like The directory to search in. recurse : bool Recursively search ``path``? Yields ------ path : str The path to a ``.osu`` file. """ if recurse: for directory, _, filenames in os.walk(path): for filename in filenames: if filename.endswith('.osu'): yield pathlib.Path(os.path.join(directory, filename)) else: for entry in os.scandir(directory): path = entry.path if path.endswith('.osu'): yield pathlib.Path(path)
def _iterdir(dirname, dironly): if not dirname: if isinstance(dirname, bytes): dirname = bytes(os.curdir, 'ASCII') else: dirname = os.curdir try: it = scandir(dirname) for entry in it: try: if not dironly or entry.is_dir(): yield entry.name except OSError: pass except OSError: return # Recursively yields relative pathnames inside a literal directory.
def _discover_sprites(self): plugin_path = offshoot.config["file_paths"]["plugins"] sprites = dict() sprite_path = f"{plugin_path}/{self.__class__.__name__}Plugin/files/data/sprites" if os.path.isdir(sprite_path): files = os.scandir(sprite_path) for file in files: if file.name.endswith(".png"): sprite_name = "_".join(file.name.split("/")[-1].split("_")[:-1]).replace(".png", "").upper() sprite_image_data = skimage.io.imread(f"{sprite_path}/{file.name}") sprite_image_data = sprite_image_data[:, :, :3, np.newaxis] if sprite_name not in sprites: sprite = Sprite(sprite_name, image_data=sprite_image_data) sprites[sprite_name] = sprite else: sprites[sprite_name].append_image_data(sprite_image_data) return sprites
def reader(path, shuffle=True): files = [] for img_file in os.scandir(path): if img_file.name.lower().endswith('.jpg', ) and img_file.is_file(): files.append(img_file.path) if shuffle: # Shuffle the ordering of all image files in order to guarantee # random ordering of the images with respect to label in the # saved TFRecord files. Make the randomization repeatable. shuffled_index = list(range(len(files))) random.shuffle(files) files = [files[i] for i in shuffled_index] return files
def set_random_clitoris_texture(): """Picks a random texture from the TEXTURE_DIRECTORY_PATH and assigns it to the clitoris material. """ try: texture_names = list(entry.name for entry in os.scandir(TEXTURE_DIRECTORY_PATH) if not entry.name.startswith('.') and entry.is_file()) random_texture_name = choice(texture_names) texture_image = bpy.data.images.load(os.path.join(TEXTURE_DIRECTORY_PATH, random_texture_name)) CLIT_MATERIAL.node_tree.nodes[TEXTURE_NODE_NAME].image = texture_image DEBUG_INFO["random_texture_name"] = random_texture_name except FileNotFoundError: print("Cloud not find the texture directory", file=sys.stderr) except IndexError: print("Texture directory is empty", file=sys.stderr)
def test_remote_directories_moved_to_trash(command): """Remote directories are moved to the trash. Remote directories are moved to the trash if the local directory contained only symlinks. """ with open("remote/letters/upper/A.txt", "w") as file: file.write("A"*BLOCK_SIZE*5) with open("remote/letters/upper/B.txt", "w") as file: file.write("B"*BLOCK_SIZE*5) command.main() shutil.rmtree("local/letters/upper") command.main() remote_trash_names = [ entry.name for entry in os.scandir(command.remote_dir.trash_dir)] assert "upper" in remote_trash_names
def check_excluded( self, paths: Iterable[str], start_path: str) -> Set[str]: """Get the paths that have been excluded by each client. Args: paths: The paths to check. start_path: The path of the directory to match globbing patterns against. Returns: The subset of input paths that have been excluded by each client. """ pattern_files = [] for entry in os.scandir(self._exclude_dir): pattern_files.append(ProfileExcludeFile(entry.path)) rm_files = set() for path in paths: for pattern_file in pattern_files: if path not in pattern_file.all_matches(start_path): break else: rm_files.add(path) return rm_files
def relativeWalk(path, startPath = None): if startPath == None: startPath = path # strxfrm -> local aware sorting - https://docs.python.org/3/howto/sorting.html#odd-and-ends for entry in sorted(os.scandir(path), key = lambda x: locale.strxfrm(x.name)): try: #print(entry.path, " ----- ", entry.name) if entry.is_file(): yield os.path.relpath(entry.path, startPath), False elif entry.is_dir(): yield os.path.relpath(entry.path, startPath), True yield from relativeWalk(entry.path, startPath) else: logging.error("Encountered an object which is neither directory nor file: " + entry.path) except OSError as e: logging.error(e) # Possible actions: # copy (always from source to target), # delete (always in target) # hardlink (always from compare directory to target directory) # rename (always in target) (2-variate) (only needed for move detection) # hardlink2 (alway from compare directory to target directory) (2-variate) (only needed for move detection)
def gather_candidates(self, context): candidates = [] for directory in context['__folders']: if not os.access(directory, os.X_OK): continue base = os.path.basename(directory) items = os.scandir(directory) for item in items: if item.name[0] == '.': continue candidates.append({ 'word': item.name, 'abbr': '%-14s %-20s' % (base, item.name), 'source__root': item.path, 'source__mtime': item.stat().st_mtime }) candidates = sorted(candidates, key=itemgetter('source__mtime'), reverse=True) return candidates
def gather_candidates(self, context): root = context['__root'] candidates = [] items = os.scandir(root) now = time.time() for item in items: if item.is_file(): mtime = item.stat().st_mtime extname = os.path.splitext(item.name)[0] candidates.append({ 'word': '%s (%s)' % (extname, ago(now, mtime)), 'action__path': item.path, 'source_mtime': mtime }) candidates = sorted(candidates, key=lambda item: item['source_mtime'], reverse=True) return candidates
def main(): results = arg_parser.parse_args() if re.match('^.+\.sqlite$', results.database): results.database = re.sub('^(.+)\.sqlite$', '\\1', results.database) try: json_filenames = results.file if results.file else \ [f.path for f in scandir() if f.is_file() and os.path.splitext(f.path)[1].lower() == '.json'] if not json_filenames: raise NoFilesProvidedError except NoFilesProvidedError: sys.exit(2) else: edb = EDataSQLDatabase(database=results.database, verbose=results.verbose) for f in [f for f in json_filenames if check_file(f)]: edb.import_file(f)
def _scan(self, root, media_dirs=None): i = 0 with os.scandir(root) as it: for entry in it: i += 1 if i > SPEED_LIMIT: # this allows the event loop to update await asyncio.sleep(SPEED_WAIT_SEC) i = 0 if self.stopped or self.interrupted: return if media_dirs: if entry.name in media_dirs: await self._scan(entry.path, media_dirs=None) elif not entry.name.startswith('.'): if entry.is_dir(follow_symlinks=False): await self._scan(entry.path, media_dirs=None) elif entry.name.rpartition('.')[2] in VIDEO_FILES_EXT: await self._refresh_video(entry.path)
def load_from_dir(self, dirname, recurse=False, **check_args): checks = [] for entry in os.scandir(dirname): if recurse and entry.is_dir(): checks.extend( self.load_from_dir(entry.path, recurse, **check_args) ) if (entry.name.startswith('.') or not entry.name.endswith('.py') or not entry.is_file()): continue checks.extend(self.load_from_file(entry.path, **check_args)) return checks
def create_arff(self): """ Create Arff file with a timestamp :return: string name of file """ mdate = datetime.now().strftime('%d-%m-%Y_%H_%M_%S') my_arff_file = os.path.join("./arff_file/", mdate + ".arff") os.makedirs(os.path.dirname(my_arff_file), exist_ok=True) # create folder if it does not exist with open(my_arff_file, mode='w', encoding='utf-8') as output: output.write("@relation " + self.relation + "\n") output.write("\n") for key, value in self.attribute_list.items(): output.write("@attribute " + key + " numeric\n") output.write("@attribute folders {%s}\n" % ', '.join('{}'.format(f.name) for f in os.scandir(self.folderpath) if f.is_dir())) output.write("\n") output.write("@data\n") for key, counter_value in sorted(self.counterList.items()): # populate data part of the file with count of each word joker_key = os.path.split(os.path.split(key)[0])[1] line = ', '.join('{}'.format(counter_value[w]) for w in self.search_list) line += ', ' + joker_key output.write(line + "\n") return my_arff_file
def dir_is_empty(path): """ Check if the given directory is empty. May raise a FileNotFoundError or a NotADirectoryError exception. """ for entry in os.scandir(path): return False return True
def file_set_readonly(path, enable, follow_symlinks=True, recursive=False): """Apply or remove the read-only property of a given file or directory.""" st_mode = os.stat(path, follow_symlinks=follow_symlinks).st_mode new_attr = ( (st_mode | stat.S_IREAD) & ~stat.S_IWRITE if enable else (st_mode | stat.S_IWRITE) & ~stat.S_IREAD) if new_attr != st_mode: os.chmod(path, new_attr) if recursive and stat.S_ISDIR(st_mode): for entry in os.scandir(path): file_set_readonly(entry.path, enable, follow_symlinks, recursive)