我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shutil.copyfile()。
def copy_file(self, infile, outfile, check=True): """Copy a file respecting dry-run and force flags. """ self.ensure_dir(os.path.dirname(outfile)) logger.info('Copying %s to %s', infile, outfile) if not self.dry_run: msg = None if check: if os.path.islink(outfile): msg = '%s is a symlink' % outfile elif os.path.exists(outfile) and not os.path.isfile(outfile): msg = '%s is a non-regular file' % outfile if msg: raise ValueError(msg + ' which would be overwritten') shutil.copyfile(infile, outfile) self.record_as_written(outfile)
def unpack_directory(filename, extract_dir, progress_filter=default_filter): """"Unpack" a directory, using the same interface as for archives Raises ``UnrecognizedFormat`` if `filename` is not a directory """ if not os.path.isdir(filename): raise UnrecognizedFormat("%s is not a directory" % filename) paths = { filename: ('', extract_dir), } for base, dirs, files in os.walk(filename): src, dst = paths[base] for d in dirs: paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d) for f in files: target = os.path.join(dst, f) target = progress_filter(src + f, target) if not target: # skip non-files continue ensure_directory(target) f = os.path.join(base, f) shutil.copyfile(f, target) shutil.copystat(f, target)
def create_params_file(self, fname): msg = QMessageBox() msg.setIcon(QMessageBox.Question) msg.setText("Parameter file %r not found, do you want SpyKING CIRCUS to " "create it for you?" % fname) msg.setWindowTitle("Generate parameter file?") msg.setInformativeText("This will create a parameter file from a " "template file and open it in your system's " "standard text editor. Fill properly before " "launching the code. See the documentation " "for details") msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No) answer = msg.exec_() if answer == QMessageBox.Yes: user_path = os.path.join(os.path.expanduser('~'), 'spyking-circus') if os.path.exists(user_path + 'config.params'): config_file = os.path.abspath(user_path + 'config.params') else: config_file = os.path.abspath( pkg_resources.resource_filename('circus', 'config.params')) shutil.copyfile(config_file, fname) self.params = fname self.last_log_file = fname.replace('.params', '.log') self.update_params()
def merge(bed1, bed2, bedOut): if not bed2: shutil.copyfile(bed1,bedOut) return with open(bed1) as f: bed_dict1 = tk_io.get_target_regions(f) with open(bed2) as f: bed_dict2 = tk_io.get_target_regions(f) for chrom in bed_dict2: for start, end in bed_dict2[chrom]: if chrom not in bed_dict1: bed_dict1[chrom]=tk_regions.Regions([]) bed_dict1[chrom].add_region((start, end)) writeOut(bed_dict1, bedOut)
def intersect(bed1, bed2, bedOut): if not bed2: shutil.copyfile(bed1,bedOut) return with open(bed1) as f: bed_dict1 = tk_io.get_target_regions(f) with open(bed2) as f: bed_dict2 = tk_io.get_target_regions(f) all_common_chroms = [chrom for chrom in bed_dict1.keys() if chrom in bed_dict2] bed_dict_intersect ={} for chrom in all_common_chroms: bed_dict_intersect[chrom] = bed_dict1[chrom].intersect(bed_dict2[chrom]) writeOut(bed_dict_intersect, bedOut)
def overlap(bed1, bed2, bedOut): if not bed2: shutil.copyfile(bed1,bedOut) return with open(bed1) as f: bed_dict1 = tk_io.get_target_regions(f) with open(bed2) as f: bed_dict2 = tk_io.get_target_regions(f) bed_dict_overlap = {} for chrom in bed_dict1: if not chrom in bed_dict_overlap: bed_dict_overlap[chrom] = tk_regions.Regions([]) for start, end in bed_dict1[chrom]: if chrom in bed_dict2 and \ bed_dict2[chrom].overlaps_region(start, end): bed_dict_overlap[chrom].add_region((start,end)) writeOut(bed_dict_overlap, bedOut)
def no_overlap(bed1, bed2, bedOut): if not bed2: shutil.copyfile(bed1,bedOut) return with open(bed1) as f: bed_dict1 = tk_io.get_target_regions(f) with open(bed2) as f: bed_dict2 = tk_io.get_target_regions(f) bed_dict_no_overlap = {} for chrom in bed_dict1: if not chrom in bed_dict_no_overlap: bed_dict_no_overlap[chrom] = tk_regions.Regions([]) for start, end in bed_dict1[chrom]: if not chrom in bed_dict2 or \ not bed_dict2[chrom].overlaps_region(start, end): bed_dict_no_overlap[chrom].add_region((start,end)) writeOut(bed_dict_no_overlap, bedOut)
def subtract(bed1, bed2, bedOut): if not bed2: shutil.copyfile(bed1,bedOut) return with open(bed1) as f: bed_dict1 = tk_io.get_target_regions(f) with open(bed2) as f: bed_dict2 = tk_io.get_target_regions(f) bed_dict_subtract = {} for chrom in bed_dict1: if not chrom in bed_dict_subtract: bed_dict_subtract[chrom] = tk_regions.Regions([]) for start, end in bed_dict1[chrom]: overlappings = [] if chrom in bed_dict2: overlappings = bed_dict2[chrom].overlapping_regions(start, end) for interval in interval_subtract(start, end, overlappings): bed_dict_subtract[chrom].add_region(interval) writeOut(bed_dict_subtract, bedOut)
def check(): """Check composition.""" env = _get_vars("$HOME/.config/epiphyte/env") if os.path.exists(FILE_NAME): shutil.copyfile(FILE_NAME, PREV_FILE) compose(env) if os.path.exists(FILE_NAME): print(get_file_hash(FILE_NAME)) output = None with open(FILE_NAME, 'r') as f: j = json.loads(f.read()) output = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': ')) with open(FILE_NAME, 'w') as f: f.write(output)
def reformat(self, sourcefile, destfile, configfile): # type: (str, str, str) -> None """Reformats sourcefile according to configfile and writes it to destfile. This method is only used for testing. """ tmpdir = tempfile.mkdtemp(prefix='whatstyle_') cfg = os.path.join(tmpdir, self.configfilename) copyfile(configfile, cfg) tmpfilename = os.path.join(tmpdir, os.path.basename(sourcefile)) copyfile(sourcefile, tmpfilename) cmdargs = [tmpfilename] exeresult = run_executable(self.exe, cmdargs) writebinary(destfile, exeresult.stdout) os.remove(tmpfilename) os.remove(cfg) os.rmdir(tmpdir)
def process_extract(extract): extract_file = os.path.join(target_dir, extract.extract + '.mbtiles') print('Create extract {}'.format(extract_file)) # Instead of patching copy over the patch source as target and # write directly to it (since that works concurrently). patch_src = args['--patch-from'] if patch_src: print('Use patch from {} as base'.format(patch_src)) shutil.copyfile(patch_src, extract_file) try: create_extract(extract, source_file, extract_file) except subprocess.CalledProcessError as e: # Failing extracts should not interrupt # the entire process print(e, file=sys.stderr) return print('Update metadata {}'.format(extract_file)) update_metadata(extract_file, extract.metadata(extract_file))
def _new_group(self, id_group, nbClusters): # generate filenames fetfilename = os.path.join(self.filename, self.basename + ('.fet.%d' % id_group)) clufilename = os.path.join(self.filename, self.basename + ('.clu.%d' % id_group)) # back up before overwriting if os.path.exists(fetfilename): shutil.copyfile(fetfilename, fetfilename + '~') if os.path.exists(clufilename): shutil.copyfile(clufilename, clufilename + '~') # create file handles self._fetfilehandles[id_group] = file(fetfilename, 'w') self._clufilehandles[id_group] = file(clufilename, 'w') # write out first line #self._fetfilehandles[id_group].write("0\n") # Number of features self._clufilehandles[id_group].write("%d\n" % nbClusters)
def unpack_directory(filename, extract_dir, progress_filter=default_filter): """"Unpack" a directory, using the same interface as for archives Raises ``UnrecognizedFormat`` if `filename` is not a directory """ if not os.path.isdir(filename): raise UnrecognizedFormat("%s is not a directory" % (filename,)) paths = {filename:('',extract_dir)} for base, dirs, files in os.walk(filename): src,dst = paths[base] for d in dirs: paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d) for f in files: name = src+f target = os.path.join(dst,f) target = progress_filter(src+f, target) if not target: continue # skip non-files ensure_directory(target) f = os.path.join(base,f) shutil.copyfile(f, target) shutil.copystat(f, target)
def init(self): """Check home directory""" if not ConfigHandler.exists(): ColorPrint.print_info(message="Default configuration initialized: " + str(StateHolder.config_file)) if not os.path.exists(StateHolder.home_dir): os.mkdir(StateHolder.home_dir) if not os.path.exists(StateHolder.config_file): src_file = os.path.join(os.path.dirname(__file__), 'resources/config') shutil.copyfile(src=src_file, dst=StateHolder.config_file) StateHolder.config_parsed = False self.read() '''Check file type catalog''' for config in self.config: conf = self.config[config] if type(conf) is not dict: continue if conf.get("repositoryType", "file") is "file": FileUtils.make_empty_file_with_empty_dict(directory=StateHolder.home_dir, file=conf.get('file', 'poco-catalog.yml'))
def generate_test_list(self, verifier_repo_dir): logger.debug("Generating test case list...") if self.MODE == 'defcore': shutil.copyfile( conf_utils.TEMPEST_DEFCORE, conf_utils.TEMPEST_RAW_LIST) elif self.MODE == 'custom': if os.path.isfile(conf_utils.TEMPEST_CUSTOM): shutil.copyfile( conf_utils.TEMPEST_CUSTOM, conf_utils.TEMPEST_RAW_LIST) else: raise Exception("Tempest test list file %s NOT found." % conf_utils.TEMPEST_CUSTOM) else: if self.MODE == 'smoke': testr_mode = "smoke" elif self.MODE == 'full': testr_mode = "" else: testr_mode = 'tempest.api.' + self.MODE cmd = ("cd {0};" "testr list-tests {1} > {2};" "cd -;".format(verifier_repo_dir, testr_mode, conf_utils.TEMPEST_RAW_LIST)) ft_utils.execute_command(cmd)
def copy_dir_contents_with_overwrite(input_dir_name, output_dir_name): """Copy the contents of a directory into another, overwriting files if they exist.""" # if output_dir_name isn't a location, make it so. if not os.path.exists(output_dir_name): os.makedirs(output_dir_name) dir_entries = os.listdir(input_dir_name) for dir_entry in dir_entries: input_path = os.path.join(input_dir_name, dir_entry) output_path = os.path.join(output_dir_name, dir_entry) if os.path.isdir(input_path): copy_dir_contents_with_overwrite(input_path, output_path) else: shutil.copyfile(input_path, output_path)
def copy_images_for_classification(): ground_truth_dates = pickle.load(open(data_dir + 'ground_truth_dates.pickle', "rb")) ground_truth_dates = sorted(ground_truth_dates, key=lambda x: x[3], reverse=False) if not os.path.exists(classify_dir): os.mkdir(classify_dir) for seed_id, coin_id, result, labeled_date, bad_angle, bad_image in ground_truth_dates: if labeled_date < 1900: continue dir = crop_dir + str(coin_id / 100) + '/' new_dir = classify_dir + str(labeled_date) + '/' if not os.path.exists(new_dir): os.mkdir(new_dir) for image_id in range(0,57): filename = str(coin_id).zfill(5) + str(image_id).zfill(2) + '.png' old_filename = dir + filename new_filename = new_dir + filename shutil.copyfile(old_filename,new_filename)
def create_single_lmdb(seed_image_id, filedata, test_id, multi_image_training=False, images_per_angle=500, retraining=False): start_time = time.time() print 'create_single_lmdb for ' + str(seed_image_id) if retraining: weight_filename = 'snapshot_iter_16880.caffemodel' shutil.copyfile(train_dir + str(seed_image_id) + '/' + weight_filename, train_dir + weight_filename) else: weight_filename = 'starting-weights.caffemodel' shutil.copyfile(weight_filename, train_dir + weight_filename) lmdb_dir = train_dir + str(seed_image_id) + '/' create_lmdb_rotate_whole_image.create_lmdbs(filedata, lmdb_dir, images_per_angle, -1, True, False) copy_train_files(lmdb_dir, multi_image_training) create_train_script(lmdb_dir, train_dir + weight_filename, multi_image_training) print 'Done in %s seconds' % (time.time() - start_time,)
def create_single_lmdb(seed_image_id, filedata, test_id, multi_image_training=False, images_per_angle=500, retraining=False): start_time = time.time() print 'create_single_lmdb for ' + str(seed_image_id) if retraining: weight_filename = 'snapshot_iter_16880.caffemodel' shutil.copyfile(train_dir + str(seed_image_id) + '/' + weight_filename, train_dir + weight_filename) else: weight_filename = 'starting-weights.caffemodel' shutil.copyfile(weight_filename, train_dir + weight_filename) lmdb_dir = train_dir + str(seed_image_id) + '/' create_lmdb_rotate_whole_image.create_lmdbs(filedata, lmdb_dir, images_per_angle, True, False) copy_train_files(lmdb_dir, multi_image_training) create_train_script(lmdb_dir, train_dir + weight_filename, multi_image_training) print 'Done in %s seconds' % (time.time() - start_time,)
def generate_report(pretty, json_file): ''' Generates a report from a static JSON result file ''' # setup workspace ws = Workspace('test-files-marionette') ws_json_file = os.path.join(ws.make_tempdir(), json_file) # copy static json result file to workspace shutil.copyfile(os.path.join(TEST_DATA_PATH, json_file), ws_json_file) # create a random test name for decibel buff = [] for i in range(10): buff.append(random.randint(0,9)) name = ''.join(['%d' % i for i in buff]) # create time stamp for report time = datetime.now() start_time = time.strftime("%Y-%m-%d %H:%M:%S") end_time = (time + timedelta(seconds=1)).strftime("%Y-%m-%d %H:%M:%S") # try to generate report. # delete workspace. ws.delete() # check if decibel reported 200 (ok) in response. return name, start_time, end_time
def overwrite_plugin_configuration(source_binaries,project_fn): plugin_fn = _find_plugins_file(project_fn) if not plugin_fn: # logger.warn('Unable to overwrite plugins. No Plugin.xml found') return print(plugin_fn) if os.path.isfile(source_binaries): source_binaries = os.path.join(os.path.dirname(source_binaries),'RiverSystem.Forms.exe') source_version = '.'.join([str(v) for v in _get_version_number(source_binaries)[00:-1]]) print(source_version) plugin_dir = os.path.join('C:\\','Users',os.environ['USERNAME'],'AppData','Roaming','Source',source_version) if not os.path.exists(plugin_dir): os.makedirs(plugin_dir) plugin_dest_file = os.path.join(plugin_dir,'Plugins.xml') shutil.copyfile(plugin_fn,plugin_dest_file)
def create(self, minecraft_version, forge_version, icon_path=None): self.logger.info("Creating MultiMC instance %s, Minecraft version %s, Forge version %s", self.name, minecraft_version, forge_version) if os.path.exists(self.directory): errmsg = "MultiMC instance {} already exists".format(self.name) raise MultiMcInstanceExistsError(errmsg) os.makedirs(self.mods_directory) multimc_icon_key = None if icon_path is not None: multimc_icon_filename = "mccdl_" + os.path.basename(icon_path) multimc_icon_key = os.path.splitext(multimc_icon_filename)[0] shutil.copyfile( icon_path, os.path.join(self.instance_manager.multimc_directory, "icons", multimc_icon_filename) ) self.configure(minecraft_version, forge_version, icon_key=multimc_icon_key)
def copy_sestbl_procdef_atx(self): # copy process.defaults and sestbl. copyfile(self.GamitOpts['process_defaults'], os.path.join(self.pwd_tables, 'process.defaults')) copyfile(self.GamitOpts['atx'], os.path.join(self.pwd_tables, 'antmod.dat')) # change the scratch directory in the sestbl. file #copyfile(self.GamitOpts['sestbl'], os.path.join(self.pwd_tables, 'sestbl.')) with open(os.path.join(self.pwd_tables, 'sestbl.'), 'w') as sestbl: with open(self.GamitOpts['sestbl']) as orig_sestbl: for line in orig_sestbl: if 'Scratch directory' in line: # empty means local directory! LA RE PU... sestbl.write('Scratch directory = \n') else: sestbl.write(line) return
def __init__(self, db_session, db_migrate, sql_connection, sqlite_db, sqlite_clean_db): self.sql_connection = sql_connection self.sqlite_db = sqlite_db self.sqlite_clean_db = sqlite_clean_db self.engine = db_session.get_engine() self.engine.dispose() conn = self.engine.connect() if sql_connection == "sqlite://": self.setup_sqlite(db_migrate) else: testdb = os.path.join(CONF.state_path, sqlite_db) db_migrate.upgrade('head') if os.path.exists(testdb): return if sql_connection == "sqlite://": conn = self.engine.connect() self._DB = "".join(line for line in conn.connection.iterdump()) self.engine.dispose() else: cleandb = os.path.join(CONF.state_path, sqlite_clean_db) shutil.copyfile(testdb, cleandb)
def __init__(self, db_api, sql_connection, sqlite_db, sqlite_clean_db): self.sql_connection = sql_connection self.sqlite_db = sqlite_db self.sqlite_clean_db = sqlite_clean_db self.engine = db_api.get_engine() self.engine.dispose() conn = self.engine.connect() if sql_connection == "sqlite://": self.setup_sqlite() elif sql_connection.startswith('sqlite:///'): testdb = paths.state_path_rel(sqlite_db) self.setup_sqlite() self.post_migrations() if sql_connection == "sqlite://": conn = self.engine.connect() self._DB = "".join(line for line in conn.connection.iterdump()) self.engine.dispose() else: cleandb = paths.state_path_rel(sqlite_clean_db) try: shutil.copyfile(testdb, cleandb) except Exception: pass