def main(argv=None): if len(sys.argv) <= 2: print('Usage ./%s --exp=<exp_name> --ec2_settings=<relative_path_to_ec2_settings_file>'%sys.argv[0]) sys.exit(0) import importlib import os.path as osp import shutil module_name = 'sandbox.rocky.tf.launchers.%s'%( osp.splitext(FLAGS.ec2_settings)[0].replace('/','.')) mod = importlib.import_module(module_name) dst_py = osp.join(osp.dirname(FLAGS.ec2_settings),FLAGS.exp+'.py') try: shutil.copy(FLAGS.ec2_settings, dst_py) except shutil.SameFileError as e: print(e) if type(mod.params) != list: mod.params = [mod.params] if hasattr(mod, 'base_params'): base_params = mod.base_params else: base_params = dict() N = 0 for params in mod.params: ps = base_params.copy() ps.update(params) N += execute(params=ps, mode="ec2") print('Launched %d jobs.'%N)
def setup_pfiles(tools): """ Copy the parameter files of the specified tools to the current working directory, and setup the ``PFILES`` environment variable. Parameters ---------- tools : list[str] Name list of the tools to be set up """ for tool in tools: pfile = subprocess.check_output([ "paccess", tool ]).decode("utf-8").strip() subprocess.check_call(["punlearn", tool]) try: shutil.copy(pfile, ".") except shutil.SameFileError: pass # Setup the ``PFILES`` environment variable os.environ["PFILES"] = "./:" + os.environ["PFILES"]
def test_dont_copy_file_onto_symlink_to_itself(self): # bug 851123. os.mkdir(TESTFN) src = os.path.join(TESTFN, 'cheese') dst = os.path.join(TESTFN, 'shop') try: with open(src, 'w') as f: f.write('cheddar') # Using `src` here would mean we end up with a symlink pointing # to TESTFN/TESTFN/cheese, while it should point at # TESTFN/cheese. os.symlink('cheese', dst) self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) with open(src, 'r') as f: self.assertEqual(f.read(), 'cheddar') os.remove(dst) finally: shutil.rmtree(TESTFN, ignore_errors=True)
def update_dict_files(path, shared_file_path): dir_template = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'template' ) try: tei = next(os.path.join(path, f) for f in os.listdir(path)) os.remove(tei) except (StopIteration, FileNotFoundError): pass def copy(files): for file in files: try: shutil.copy2(file, path) except shutil.SameFileError: pass copy(os.path.join(shared_file_path, f) for f in ('freedict-dictionary.css', 'freedict-P5.dtd', 'INSTALL', 'freedict-P5.rng', 'freedict-P5.xml')) copy(os.path.join(dir_template, f) for f in os.listdir(dir_template))
def test_dont_copy_file_onto_link_to_itself(self): # bug 851123. os.mkdir(TESTFN) src = os.path.join(TESTFN, 'cheese') dst = os.path.join(TESTFN, 'shop') try: with open(src, 'w') as f: f.write('cheddar') os.link(src, dst) self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) with open(src, 'r') as f: self.assertEqual(f.read(), 'cheddar') os.remove(dst) finally: shutil.rmtree(TESTFN, ignore_errors=True)
def test_copyfile_same_file(self): # copyfile() should raise SameFileError if the source and destination # are the same. src_dir = self.mkdtemp() src_file = os.path.join(src_dir, 'foo') write_file(src_file, 'foo') self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) # But Error should work too, to stay backward compatible. self.assertRaises(Error, shutil.copyfile, src_file, src_file)
def test_module_all_attribute(self): self.assertTrue(hasattr(shutil, '__all__')) target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat', 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error', 'SpecialFileError', 'ExecError', 'make_archive', 'get_archive_formats', 'register_archive_format', 'unregister_archive_format', 'get_unpack_formats', 'register_unpack_format', 'unregister_unpack_format', 'unpack_archive', 'ignore_patterns', 'chown', 'which', 'get_terminal_size', 'SameFileError'] if hasattr(os, 'statvfs') or os.name == 'nt': target_api.append('disk_usage') self.assertEqual(set(shutil.__all__), set(target_api))
def merge_vcfs(input_dir, output_dir, project_name, raw_vcf_path_list=None, vcfs_gzipped=False): """Merge vcf files into single multisample vcf, bgzip and index merged vcf file.""" if raw_vcf_path_list is None: vcf_file_extension = BGZIPPED_VCF_EXTENSION if vcfs_gzipped else VCF_EXTENSION raw_vcf_path_list = _get_vcf_file_paths_list_in_directory(input_dir, vcf_file_extension) if len(raw_vcf_path_list) == 0: raise ValueError("No VCFs found with extension '{0}'.".format(vcf_file_extension)) elif len(raw_vcf_path_list) == 0: raise ValueError("Input list of VCF files is empty.") if len(raw_vcf_path_list) > 1: bgzipped_vcf_path_list = set([bgzip_and_index_vcf(vcf_fp) for vcf_fp in raw_vcf_path_list]) single_vcf_path = os.path.join(output_dir, project_name + VCF_EXTENSION) _merge_bgzipped_indexed_vcfs(bgzipped_vcf_path_list, single_vcf_path) else: file_name = os.path.basename(raw_vcf_path_list[0]) # w/o path single_vcf_path = os.path.join(output_dir, file_name) try: # move to output dir with same file name shutil.copyfile(raw_vcf_path_list[0], single_vcf_path) except shutil.SameFileError: # I ran into a case where there was a single input file, AND the input and output dirs were the same so it # was already where it needed to be. In this case, an error is thrown because you can't copy a file to # itself, but that's cool, so just ignore it. pass return single_vcf_path
def _parse_argument(self, scenario, key: str, value): """Some values of the scenario-file need to be changed upon writing, such as the 'ta' (target algorithm), due to it's callback. Also, the configspace, features, train_inst- and test-inst-lists are saved to output_dir, if they exist. Parameters: ----------- scenario: Scenario Scenario-file to be written key: string Name of the attribute in scenario-file value: Any Corresponding attribute Returns: -------- new value: string The altered value, to be written to file Sideeffects: ------------ - copies files pcs_fn, train_inst_fn, test_inst_fn and feature_fn to output if possible, creates the files from attributes otherwise """ if key in ['pcs_fn', 'train_inst_fn', 'test_inst_fn', 'feature_fn']: # Copy if file exists, else write to new file if value is not None and os.path.isfile(value): try: return shutil.copy(value, scenario.output_dir_for_this_run) except shutil.SameFileError: return value # File is already in output_dir elif key == 'pcs_fn' and scenario.cs is not None: new_path = os.path.join(scenario.output_dir_for_this_run, "configspace.pcs") self.write_pcs_file(scenario.cs, new_path) elif key == 'train_inst_fn' and scenario.train_insts != [None]: new_path = os.path.join(scenario.output_dir_for_this_run, 'train_insts.txt') self.write_inst_file(scenario.train_insts, new_path) elif key == 'test_inst_fn' and scenario.test_insts != [None]: new_path = os.path.join(scenario.output_dir_for_this_run, 'test_insts.txt') self.write_inst_file(scenario.test_insts, new_path) elif key == 'feature_fn' and scenario.feature_dict != {}: new_path = os.path.join(scenario.output_dir_for_this_run, 'features.txt') self.write_inst_features_file(scenario.n_features, scenario.feature_dict, new_path) else: return None # New value -> new path return new_path elif key == 'ta' and value is not None: # Reversing the callback on 'ta' (shlex.split) return " ".join(value) elif key in ['train_insts', 'test_insts', 'cs', 'feature_dict']: # No need to log, recreated from files return None else: return value