我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用distutils.dir_util.copy_tree()。
def test_copy_tree_verbosity(self): mkpath(self.target, verbose=0) copy_tree(self.target, self.target2, verbose=0) self.assertEqual(self._logs, []) remove_tree(self.root_target, verbose=0) mkpath(self.target, verbose=0) a_file = os.path.join(self.target, 'ok.txt') with open(a_file, 'w') as f: f.write('some content') wanted = ['copying %s -> %s' % (a_file, self.target2)] copy_tree(self.target, self.target2, verbose=1) self.assertEqual(self._logs, wanted) remove_tree(self.root_target, verbose=0) remove_tree(self.target2, verbose=0)
def datadir(tmpdir, request): ''' Fixture responsible for locating the test data directory and copying it into a temporary directory. ''' filename = request.module.__file__ test_dir = os.path.dirname(filename) data_dir = os.path.join(test_dir, 'data') dir_util.copy_tree(data_dir, str(tmpdir)) def getter(filename, as_str=True): filepath = tmpdir.join(filename) if as_str: return str(filepath) return filepath return getter
def copy_tree(self, infile, outfile, preserve_mode=1, preserve_times=1, preserve_symlinks=0, level=1): """Copy an entire directory tree respecting verbose, dry-run, and force flags. """ return dir_util.copy_tree( infile, outfile, preserve_mode,preserve_times,preserve_symlinks, not self.force, dry_run=self.dry_run)
def do_step(context): settings = context.meta['settings'] index_file = context.meta['index-file'] username = settings["username"] home_dir = os.path.join("/home", username) # Copy all the files in ./bosh into the home directory dir_util.copy_tree("./bosh/", home_dir) copy("./manifests/{0}".format(index_file), "{0}/manifests/".format(home_dir)) call("chown -R {0} {1}".format(username, home_dir), shell=True) call("chmod 400 {0}/bosh".format(home_dir), shell=True) return context
def install_overrides(self, destination): # FIXME: distutils.dir_util.copy_tree seems to keep some internal state when it does its copy. # # If the destination directory disappears, copy_tree will not recreate missing path # components. # # In practice this should not be an issue since our script will execute once to install a # modpack, then exit. copy_tree(os.path.join(self.unpack_directory, self.manifest["overrides"]), destination)
def lp_init(self, pf_w_path, providers=['libvirt']): """ Initializes a linchpin project. Creates the necessary directory structure, includes PinFile, topologies and layouts for the given provider. (Default: Libvirt. Other providers not yet implemented.) :param pf_w_path: Path to where the PinFile might exist. Gets created if it doesn't exist. :param providers: A list of providers for which templates (and a target) will be provided into the workspace. NOT YET IMPLEMENTED """ src = self.get_cfg('init', 'source', 'templates/') src_w_path = os.path.realpath('{0}/{1}'.format(self.ctx.lib_path, src)) src_pf = os.path.realpath('{0}.lp_example'.format(pf_w_path)) try: if os.path.exists(pf_w_path): if not click.confirm('{0} already exists,' 'overwrite it?'.format(pf_w_path), default=False): sys.exit(0) dir_util.copy_tree(src_w_path, self.workspace, verbose=1) os.rename(src_pf, pf_w_path) self.ctx.log_state('{0} and file structure created at {1}'.format( self.pinfile, self.workspace)) except Exception as e: self.ctx.log_state('Error: {0}'.format(e)) sys.exit(1)
def do_activate(self, *args): print('LinterClang activated') self.enabled = True dir_util.copy_tree(self.parent.projectPath, '/tmp/pyidetmp') if self.live: self.connection = self.parent.sbuff.connect('changed', self.set_file_changed) self.do_live_linting()
def replace_tools_from_product_repo(node_list, **kwargs): """ This will clone the tools from product repo and then replaces tools directory in warrior main with this tools repo. """ logfile = kwargs.get("logfile") config_file_name = kwargs.get("config_file_name") console_log_name = kwargs.get("console_log_name") print_log_name = kwargs.get("print_log_name") if "tools" in node_list: tools_node = get_node(config_file_name, "tools") tools_url = get_attribute_value(tools_node, "url") tools_root = get_repository_name(tools_url) tools_clone = get_attribute_value(tools_node, "clone") tools_base_path = "" warrior_node = get_node(config_file_name, "warriorframework") warrior_base_path = get_attribute_value(warrior_node, "destination") if tools_url and tools_clone == "yes": tools_base_path = validate_base_path( tools_base_path, logfile=logfile, config_file_name=config_file_name, console_log_name=console_log_name, print_log_name=print_log_name) warrior_base_path = validate_base_path( warrior_base_path, logfile=logfile, config_file_name=config_file_name, console_log_name=console_log_name, print_log_name=print_log_name) warrior_tools_path = os.path.join(warrior_base_path, "warrior", "Tools") product_tools_path = os.path.join(tools_base_path, tools_root, "Tools") dir_util.copy_tree(product_tools_path, warrior_tools_path, update=1) delete_directory(os.path.join(tools_base_path, tools_root), logfile, print_log_name)
def do_merge(backups, precision, dry_run, include_latest, skip_errors, **kwargs): data = sorted(os.listdir(backups)) if not data: raise MergeError("Nothing found.") groups, helper = tee(group_by_timestamp(data, precision * 60 * 60, skip_errors)) next(helper, None) for group in groups: try: next(helper) except StopIteration: if not include_latest: logger.info("Folders `{0}` were excluded from merge because " "they can be not " "complete.".format(', '.join(group))) continue logger.info("GROUP: {0}".format(group)) if will_override(backups, group): raise MergeError("Group `{0}` contains overlapping files. May be " "precision was bigger then backup " "periodicity.".format(group)) dst = os.path.join(backups, group.pop(0)) for item in group: src = os.path.join(backups, item) dir_util.copy_tree(src, dst, verbose=True, dry_run=dry_run) dir_util.remove_tree(src, verbose=True, dry_run=dry_run)
def _commit(self): """Sync the temporary directory to the final path. """ dir_util.copy_tree(self.path, self._final_path)
def merge_tree(self, dst, symlinks=False, *args, **kwargs): """ Copy entire contents of self to dst, overwriting existing contents in dst with those in self. If the additional keyword `update` is True, each `src` will only be copied if `dst` does not exist, or `src` is newer than `dst`. Note that the technique employed stages the files in a temporary directory first, so this function is not suitable for merging trees with large files, especially if the temporary directory is not capable of storing a copy of the entire source tree. """ update = kwargs.pop('update', False) with tempdir() as _temp_dir: # first copy the tree to a stage directory to support # the parameters and behavior of copytree. stage = _temp_dir / str(hash(self)) self.copytree(stage, symlinks, *args, **kwargs) # now copy everything from the stage directory using # the semantics of dir_util.copy_tree dir_util.copy_tree(stage, dst, preserve_symlinks=symlinks, update=update) # # --- Special stuff from os
def copy_tree(self, infile, outfile, preserve_mode=1, preserve_times=1, preserve_symlinks=0, level=1): """Copy an entire directory tree respecting verbose, dry-run, and force flags. """ return dir_util.copy_tree(infile, outfile, preserve_mode, preserve_times, preserve_symlinks, not self.force, dry_run=self.dry_run)
def test_copy_tree_skips_nfs_temp_files(self): mkpath(self.target, verbose=0) a_file = os.path.join(self.target, 'ok.txt') nfs_file = os.path.join(self.target, '.nfs123abc') for f in a_file, nfs_file: with open(f, 'w') as fh: fh.write('some content') copy_tree(self.target, self.target2) self.assertEqual(os.listdir(self.target2), ['ok.txt']) remove_tree(self.root_target, verbose=0) remove_tree(self.target2, verbose=0)
def test_copy_tree_exception_in_listdir(self): """ An exception in listdir should raise a DistutilsFileError """ with patch("os.listdir", side_effect=OSError()), \ self.assertRaises(errors.DistutilsFileError): src = self.tempdirs[-1] dir_util.copy_tree(src, None)
def download_and_extract_dataset(self): if not os.path.exists(self.get_dataset_filename()): print("Downloading OpenOMR dataset...") self.download_file(self.get_dataset_download_url(), self.get_dataset_filename()) print("Extracting OpenOMR dataset...") absolute_path_to_temp_folder = os.path.abspath('OpenOmrDataset') self.extract_dataset(absolute_path_to_temp_folder) os.makedirs(self.destination_directory, exist_ok=True) dir_util.copy_tree(os.path.join(absolute_path_to_temp_folder, "OpenOMR-Dataset"), self.destination_directory) self.clean_up_temp_directory(absolute_path_to_temp_folder)
def download_and_extract_dataset(self): if not os.path.exists(self.get_dataset_filename()): print("Downloading Fornes Music Symbol dataset...") self.download_file(self.get_dataset_download_url(), self.get_dataset_filename()) print("Extracting Fornes Music Symbol dataset...") absolute_path_to_temp_folder = os.path.abspath('Fornes-Music-Symbols') self.extract_dataset(absolute_path_to_temp_folder) self.__fix_capital_file_endings(absolute_path_to_temp_folder) os.makedirs(self.destination_directory, exist_ok=True) dir_util.copy_tree(os.path.join(absolute_path_to_temp_folder, "Music_Symbols"), self.destination_directory) self.clean_up_temp_directory(absolute_path_to_temp_folder)
def download_and_extract_dataset(self): if not os.path.exists(self.get_dataset_filename()): print("Downloading Rebelo Symbol Dataset 2...") self.download_file(self.get_dataset_download_url(), self.get_dataset_filename()) print("Extracting Rebelo Symbol Dataset 2...") absolute_path_to_temp_folder = os.path.abspath('Rebelo-Music-Symbol-Dataset2') self.extract_dataset(absolute_path_to_temp_folder) os.makedirs(self.destination_directory, exist_ok=True) dir_util.copy_tree(os.path.join(absolute_path_to_temp_folder, "database2"), self.destination_directory) self.clean_up_temp_directory(absolute_path_to_temp_folder)
def download_and_extract_dataset(self): if not os.path.exists(self.get_dataset_filename()): print("Downloading Printed Music Symbol dataset...") self.download_file(self.get_dataset_download_url(), self.get_dataset_filename()) print("Extracting Printed Music Symbol dataset...") absolute_path_to_temp_folder = os.path.abspath('PrintedMusicSymbolsDataset') self.extract_dataset(absolute_path_to_temp_folder) os.makedirs(self.destination_directory, exist_ok=True) dir_util.copy_tree(os.path.join(absolute_path_to_temp_folder, "PrintedMusicSymbolsDataset"), self.destination_directory) self.clean_up_temp_directory(absolute_path_to_temp_folder)
def download_and_extract_dataset(self): if not os.path.exists(self.get_dataset_filename()): print("Downloading Rebelo Symbol Dataset 1...") self.download_file(self.get_dataset_download_url(), self.get_dataset_filename()) print("Extracting Rebelo Symbol Dataset 1...") absolute_path_to_temp_folder = os.path.abspath('Rebelo-Music-Symbol-Dataset1') self.extract_dataset(absolute_path_to_temp_folder) os.makedirs(self.destination_directory, exist_ok=True) dir_util.copy_tree(os.path.join(absolute_path_to_temp_folder, "database1"), self.destination_directory) self.clean_up_temp_directory(absolute_path_to_temp_folder)
def clone_dir_with_timestap(orig_dir_path): """Copy a folder into the same directory and append a timestamp.""" new_dir = create_dir(append_timestamp(orig_dir_path)) try: du.copy_tree(orig_dir_path, new_dir) except Exception, e: wl_log.error("Error while cloning the dir with timestamp" + str(e)) finally: return new_dir
def copy_sstables(self, cluster, node): for x in xrange(0, cluster.data_dir_count): data_dir = os.path.join(node.get_path(), 'data{0}'.format(x)) copy_root = os.path.join(node.get_path(), 'data{0}_copy'.format(x)) for ddir in os.listdir(data_dir): keyspace_dir = os.path.join(data_dir, ddir) if os.path.isdir(keyspace_dir) and ddir != 'system': copy_dir = os.path.join(copy_root, ddir) dir_util.copy_tree(keyspace_dir, copy_dir)
def copy_layout(self, src, dst): self.logger.info("Copying layout \"%s\" on \"%s\"", src, dst) return copy_tree(src, dst)
def copyDependencies(): copy_tree(target("dependencies"), tomcatLib) #copy_tree(project("server"), target())
def deploy(): apps = sorted(file for file in os.listdir(apath(r=request))) form = SQLFORM.factory( Field( 'osrepo', default='/tmp', label=T('Path to local openshift repo root.'), requires=EXISTS(error_message=T('directory not found'))), Field('osname', default='web2py', label=T('WSGI reference name')), Field('applications', 'list:string', requires=IS_IN_SET(apps, multiple=True), label=T('web2py apps to deploy'))) cmd = output = errors = "" if form.accepts(request, session): try: kill() except: pass ignore_apps = [ item for item in apps if not item in form.vars.applications] regex = re.compile('\(applications/\(.*') w2p_origin = os.getcwd() osrepo = form.vars.osrepo osname = form.vars.osname #Git code starts here repo = Repo(form.vars.osrepo) index = repo.index assert repo.bare == False for i in form.vars.applications: appsrc = os.path.join(apath(r=request), i) appdest = os.path.join(osrepo, 'wsgi', osname, 'applications', i) dir_util.copy_tree(appsrc, appdest) #shutil.copytree(appsrc,appdest) index.add(['wsgi/' + osname + '/applications/' + i]) new_commit = index.commit("Deploy from Web2py IDE") origin = repo.remotes.origin origin.push origin.push() #Git code ends here return dict(form=form, command=cmd)
def run(self, args): assignment = args.assignment logger.info('Collecting late submissions for %s', assignment) config = Config.load_config() github = config.github failures = [] for line in args.submissions: email, late_days, sha = line.strip().split(',') logger.info('Collecting %s from %s for %s, %s late day(s)', sha, email, assignment, late_days) try: student = Student(email, None) # clone the repo into the subdirectory at args.destination dest_dir = path.join(args.destination, student.unix_name) # just add a submodule rather than downloading the whole repo with tempfile.TemporaryDirectory() as tmpdir: # clone the repo into a temp directory and copy the assignment dir to the submissions dir repo = git.Repo.clone_from(student.repo_url, tmpdir) # make sure we got the right sha repo.git.checkout(sha) dir_util.copy_tree(path.join(tmpdir, assignment), dest_dir) # comment on the commit that we collected repo_fqn = config.github_org + '/' + student.repo_name # must use fully qualified repo name with org repo = github.get_repo(repo_fqn) commit = repo.get_commits(sha=sha)[0] comment = 'This commit was collected as part of "{}". \n \ This was a late submission using {} late day(s). If you think this is \ incorrect, please post on Piazza.'.format(assignment, late_days) logger.debug('Commenting on GitHub') commit.create_comment(comment) except Exception as e: logger.error('Failed for %s, %s', email, e) failures.append((email, late_days, sha)) if failures: logger.error('Failed on %d students, please try them again', len(failures)) logger.error('Failures have been written out to ./failures.txt') with open('failures.txt', 'w') as f: for s in failures: f.write('{},{},{}\n'.format(email, late_days, sha))
def run(self, args): assignment = args.assignment students = self.load_students(args.students) logger.info('Collecting %s from %d students', assignment, len(students)) os.makedirs(args.destination, exist_ok=True) meta_repo = Repo.meta_repo() config = Config.load_config() github = config.github failures = [] for student in students: try: # clone the repo into the subdirectory at args.destination dest_dir = path.join(args.destination, student.unix_name) # just add a submodule rather than downloading the whole repo with tempfile.TemporaryDirectory() as tmpdir: # clone the repo into a temp directory and copy the assignment dir to the submissions dir repo = git.Repo.clone_from(student.repo_url, tmpdir) # if we have a deadline check out the commit from master at that time if args.deadline is not None: rev_list = getattr(repo.git, 'rev-list') sha = rev_list('master', n=1, before=args.deadline) else: sha = repo.head.commit.hexsha dir_util.copy_tree(path.join(tmpdir, assignment), dest_dir) logger.info('Collected %s at %s into %s', student.unix_name, sha, dest_dir) # comment on the commit that we collected repo_fqn = config.github_org + '/' + student.repo_name # must use fully qualified repo name with org repo = github.get_repo(repo_fqn) commit = repo.get_commits(sha=sha)[0] comment = 'This commit was collected as part of "{}". \n \ If you think this was a mistake or you want to submit this assignment late, \ please fill out the late form.'.format(assignment) commit.create_comment(comment) except Exception as e: logger.error('Failed to collect %s', student.unix_name) logger.error(e) failures.append(student) if failures: logger.error('Failed on %d students, please try them again', len(failures)) logger.error('Failures have been written out to ./failures.txt') with open('failures.txt', 'w') as f: for s in failures: f.write(s.email + ' ' + s.github + '\n') # commit all the submissions at once meta_repo.index.commit('Collected {} from {} students' .format(assignment, len(students))) meta_repo.remote().push()
def before_buildfs(source, target, env): print "before_buildfs" # SPIFFS Stats With Different Combinations of Processing # Updated: 12.28.2016 # No Processing # 20 Files, 1.46 MB of 2.81 MB Used # custom_option = "gz" # 19 Files, 898.84 KB of 2.81 MB Used # custom_option = "gz|css" # 17 Files, 896.88 KB of 2.81 MB Used # custom_option = "gz|css|js" # 13 Files, 893.94 KB of 2.81 MB Used # custom_option = "gz|css|js|media" # 8 Files, 898.60 KB of 2.81 MB Used # clone 'www' folder to 'data' folder files = dir_util.copy_tree(www, data, ) # embed Javascript, CSS & media into html files if re.search(r'css|js|media', options): for file in files: if re.search(r'\.htm', file): print file content = read_file(file) if re.search(r'css', options): content = embed_css( content ) if re.search(r'js', options): content = combine_js( content ) if re.search(r'media', options): content = embed_media( content ) # Save New HTML File with open(file, 'w') as new_file: new_file.write( content ) new_file.close() # gzip appropriate files if re.search(r'gz', options): pattern = re.compile(ur'\.htm|\.css|\.js|\.map|\.svg|\.ico') for file in files: if re.search(pattern, file): if os.path.exists(file): print file gzFile( file ) # remove 'data' folder after upload
def install(self, env): print 'Install the Tomcat Master.' # Load the all configuration files config = Script.get_config() tomcat_user = config['configurations']['common-env']['user'] tomcat_group = config['configurations']['common-env']['group'] tomcat_home = config['configurations']['common-env']['catalina_home'] tomcat_package = os.path.join(os.path.dirname(__file__), '..', 'tomcat') # Create user and group (if they don't exist) try: grp.getgrnam(tomcat_group) except KeyError: Execute(format('groupadd {tomcat_group}')) try: pwd.getpwnam(tomcat_user) except KeyError: Execute(format('adduser {tomcat_user} -g {tomcat_group}')) Execute(format('mkdir -p {tomcat_home}'), user=tomcat_user, group=tomcat_group) Execute(format('rm -r {tomcat_home}')) #copy_tree(tomcat_package, tomcat_home) #os.chown(tomcat_home, tomcat_user, tomcat_group) os.system(format('cp -a {tomcat_package} {tomcat_home}')) os.system(format('mkdir {tomcat_home}/logs')) os.system(format('mkdir {tomcat_home}/work')) os.system(format('chown {tomcat_user}:{tomcat_group} -R {tomcat_home}')) os.system(format('chmod ug+x {tomcat_home}/bin/*.sh')) uid = pwd.getpwnam(tomcat_user).pw_uid gid = grp.getgrnam(tomcat_group).gr_gid #copytree(tomcat_package, tomcat_home) #os.chown(tomcat_package, uid, gid) #Execute(format('cp -a {tomcat_package} {tomcat_home}')) # Install packages self.install_packages(env) #Directory([tomcat_home], # recursive=True) #Execute('cp -r %s/* %s' % (tomcat_package, tomcat_home)) # Create a new user and group #Execute( format("groupadd -f {tomcat_user}") ) #Execute( format("id -u {tomcat_user} &>/dev/null || useradd -s /bin/bash {tomcat_user} -g {tomcat_user}") ) ### Continue installing and configuring your service print 'Installation complete.'