我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用filecmp.cmp()。
def testCopyFile(self): """Tests if the copying of a file none existing beforhand works.""" expected_content = "this is test content." with tempfile.TemporaryDirectory() as tmpdir: source = os.path.join(tmpdir, self.file) destination = os.path.join(tmpdir, "copy", self.file) with open(source, "a") as f: f.write(expected_content) handler = file_handler.FileHandler() self.assertFalse(os.path.exists(destination)) handler.CopyFile(source, destination) self.assertTrue(os.path.exists(destination)) self.assertTrue(filecmp.cmp(destination, source))
def true_duplicates(files): """ Compare the given files, breaking them down into groups with identical content. """ while len(files) > 1: next_set = [] this_set = [] master = files[0] this_set.append(master) for other in files[1:]: if filecmp.cmp(master, other, False): this_set.append(other) else: next_set.append(other) if len(this_set) > 1: yield this_set files = next_set
def test_file_to_file_decrypt_required_encryption_context_success(tmpdir, required_encryption_context): plaintext = tmpdir.join('source_plaintext') ciphertext = tmpdir.join('ciphertext') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(ciphertext) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) + ' --encryption-context ' + required_encryption_context aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def test_file_to_file_cycle(tmpdir): plaintext = tmpdir.join('source_plaintext') ciphertext = tmpdir.join('ciphertext') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(ciphertext) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def test_file_to_file_cycle_target_through_symlink(tmpdir): plaintext = tmpdir.join('source_plaintext') output_dir = tmpdir.mkdir('output') os.symlink(str(output_dir), str(tmpdir.join('output_link'))) ciphertext = tmpdir.join('output_link', 'ciphertext') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(ciphertext) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def test_file_to_file_cycle_with_caching(tmpdir): plaintext = tmpdir.join('source_plaintext') ciphertext = tmpdir.join('ciphertext') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template(caching=True).format( source=str(plaintext), target=str(ciphertext) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def test_file_to_dir_cycle(tmpdir): inner_dir = tmpdir.mkdir('inner') plaintext = tmpdir.join('source_plaintext') ciphertext = inner_dir.join('source_plaintext.encrypted') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(inner_dir) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) assert os.path.isfile(str(ciphertext)) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def validate(self,pid): """ This function compares the output file with the key Args : pid : The id of the problem to map to the correct key. Returns : WA : Wrong Answer , if comparison fails. AC : Accepted , if comparison succeeds. """ if filecmp.cmp(self.cwd+"/tmp/output.txt",self.cwd+"/env/key/key"+str(pid)+".txt")==True: return "AC" else: return "WA"
def delete_identical_files(): for battle_folder in os.listdir(download_path): battle_folder_path = os.path.join(os.curdir, download_folder_name, battle_folder) if not battle_folder.startswith('.') and os.path.isdir(battle_folder_path): battle_files = os.listdir(battle_folder_path) previous_battle_file = os.path.join(os.curdir, "main.py") for battle_file in battle_files: current_battle_file = os.path.join(os.curdir, download_folder_name, battle_folder, battle_file) if filecmp.cmp(previous_battle_file, current_battle_file, shallow=0): os.remove(current_battle_file) print current_battle_file, "Deleted, copy of another file" else: previous_battle_file = current_battle_file print "Identical file search complete."
def compare_files(file1, file2, log): """Compare two files Use Python's filecmp module to compare two files and log/print results. Args: file1 (string): Path of first file to compare file2 (string): Path of second file to compare log (:obj:`Logger`): Log file object. Returns: boolean: True if they seem equal, False otherwise """ if filecmp.cmp(file1, file2): msg = ("Two MAC Address Table Files Are Identical! '%s' & '%s'" % (file1, file2)) log.error(msg) print("Error: " + msg) return True else: return False
def _sorted_unique_jdk_configs(configs): path_seen = set() unique_configs = [c for c in configs if c.home not in path_seen and not path_seen.add(c.home)] def _compare_configs(c1, c2): if c1 == _default_java_home: if c2 != _default_java_home: return 1 elif c2 == _default_java_home: return -1 if c1 in _extra_java_homes: if c2 not in _extra_java_homes: return 1 elif c2 in _extra_java_homes: return -1 return VersionSpec.__cmp__(c1.version, c2.version) return sorted(unique_configs, cmp=_compare_configs, reverse=True)
def test_single_conflict_unsolved(self): """Tests a walker against a file with a single conflict, without solving it""" # Given a file to merge file = CW_PATH.format('single_conflict') walker = ConflictsWalker(file, 'test', REPORT_NONE, False) # When walking the conflicts self.assertTrue(walker.has_more_conflicts()) self.assertFalse(walker.has_more_conflicts()) walker.end(False) # Then check the output self.assertTrue(filecmp.cmp(walker.merged, file)) self.assertEqual(walker.get_merge_status(), ERROR_CONFLICTS) os.remove(walker.merged)
def test_single_conflict_rewritten(self): """Tests a walker against a file with a single conflict, without solving it""" # Given a file to merge file = CW_PATH.format('single_conflict') walker = ConflictsWalker(file, 'test', REPORT_NONE, False) # When walking the conflicts self.assertTrue(walker.has_more_conflicts()) conflict = walker.next_conflict() conflict.rewrite(RESOLUTION) self.assertFalse(walker.has_more_conflicts()) walker.end(False) # Then check the output self.assertTrue(filecmp.cmp(walker.merged, CW_PATH.format('single_conflict_resolved'))) self.assertEqual(walker.get_merge_status(), ERROR_CONFLICTS) os.remove(walker.merged)
def test_single_conflict_solved(self): """Tests a walker against a file with a single conflict, and solving it""" # Given a file to merge file = CW_PATH.format('single_conflict') walker = ConflictsWalker(file, 'test', REPORT_NONE, False) # When walking the conflicts self.assertTrue(walker.has_more_conflicts()) conflict = walker.next_conflict() conflict.resolve(RESOLUTION) self.assertFalse(walker.has_more_conflicts()) walker.end(False) # Then check the output self.assertTrue(filecmp.cmp(walker.merged, CW_PATH.format('single_conflict_resolved'))) self.assertEqual(walker.get_merge_status(), SUCCESS) os.remove(walker.merged)
def test02_transform(): base_output = "output_fctl_data_dict" outputs = {} extensions = ['html', 'xlsx', 'tsv', 'md'] for ext in extensions: outputs[ext] = "{}.{}".format(base_output, ext) exp = os.path.join(TEST_DIR, 'resources', 'expected', 'data_dict') argv = ['transform', SCHEMA_FILE, '--output', base_output, '--columns', 'Field_compact_name', 'Field_name', 'Full_name', 'Description', 'Count', 'Percentage', 'Types_count', '--formats'] + extensions main(argv) assert filecmp.cmp(outputs['tsv'], "{}.tsv".format(exp)) assert filecmp.cmp(outputs['md'], "{}.md".format(exp)) with open(outputs['html']) as out_fd, \ open("{}.html".format(exp)) as exp_fd: assert out_fd.read().replace(' ', '') == exp_fd.read().replace(' ', '') res = [cell.value for row in load_workbook(outputs['xlsx']).active for cell in row] exp = [cell.value for row in load_workbook("{}.xlsx".format(exp)).active for cell in row] assert res == exp for output in outputs.values(): os.remove(output)
def test04_transform_default_cols(): base_output = "output_fctl_data_dict_default" outputs = {} extensions = ['html', 'xlsx', 'tsv', 'md'] for ext in extensions: outputs[ext] = "{}.{}".format(base_output, ext) exp = os.path.join(TEST_DIR, 'resources', 'expected', 'data_dict_default') argv = ['transform', SCHEMA_FILE, '--output', base_output, '--formats'] + extensions main(argv) assert filecmp.cmp(outputs['tsv'], "{}.tsv".format(exp)) assert filecmp.cmp(outputs['md'], "{}.md".format(exp)) with open(outputs['html']) as out_fd, \ open("{}.html".format(exp)) as exp_fd: assert out_fd.read().replace(' ', '') == exp_fd.read().replace(' ', '') res = [cell.value for row in load_workbook(outputs['xlsx']).active for cell in row] exp = [cell.value for row in load_workbook("{}.xlsx".format(exp)).active for cell in row] assert res == exp for output in outputs.values(): os.remove(output)
def test06_compare(): base_output = "output_fctl_diff" outputs = {} extensions = ['html', 'xlsx', 'tsv', 'md'] for ext in extensions: outputs[ext] = "{}.{}".format(base_output, ext) exp = os.path.join(TEST_DIR, 'resources', 'functional', 'expected', 'diff') exp_schema = os.path.join(TEST_DIR, 'resources', 'input', 'test_schema2.json') argv = ['compare', SCHEMA_FILE, exp_schema, '--output', base_output, '--formats'] + extensions main(argv) assert filecmp.cmp(outputs['tsv'], "{}.tsv".format(exp)) assert filecmp.cmp(outputs['md'], "{}.md".format(exp)) with open(outputs['html']) as out_fd, \ open("{}.html".format(exp)) as exp_fd: assert out_fd.read().replace(' ', '') == exp_fd.read().replace(' ', '') res = [cell.value for row in load_workbook(outputs['xlsx']).active for cell in row] exp = [cell.value for row in load_workbook("{}.xlsx".format(exp)).active for cell in row] assert res == exp for output in outputs.values(): os.remove(output)
def test07_compare_detailed(): base_output = "output_fctl_detailed_diff" outputs = {} extensions = ['html', 'xlsx', 'tsv', 'md'] for ext in extensions: outputs[ext] = "{}.{}".format(base_output, ext) exp = os.path.join(TEST_DIR, 'resources', 'functional', 'expected', 'detailed_diff') exp_schema = os.path.join(TEST_DIR, 'resources', 'input', 'test_schema2.json') argv = ['compare', SCHEMA_FILE, exp_schema, '--output', base_output, '--detailed_diff', '--formats'] + extensions main(argv) assert filecmp.cmp(outputs['tsv'], "{}.tsv".format(exp)) assert filecmp.cmp(outputs['md'], "{}.md".format(exp)) with open(outputs['html']) as out_fd, \ open("{}.html".format(exp)) as exp_fd: assert out_fd.read().replace(' ', '') == exp_fd.read().replace(' ', '') res = [cell.value for row in load_workbook(outputs['xlsx']).active for cell in row] exp = [cell.value for row in load_workbook("{}.xlsx".format(exp)).active for cell in row] assert res == exp for output in outputs.values(): os.remove(output)
def test_all(self): """Test All""" expected_r = GroupRecord(name="group1", members=["member0", "member1", "member2"]) with GroupReader(GROUP_FN_1) as reader: records = [r for r in reader] self.assertEqual(len(records), 1) self.assertEqual(records[0], expected_r) expected_r = GroupRecord(name="PB.1.1", members="i0_HQ_sampleb92221|c8319/f2p0/463,i0_HQ_sampleb92221|c28/f4p0/460,i0_HQ_sampleb92221|c524/f2p0/462,i0_HQ_sampleb92221|c539/f2p0/460,i0_HQ_sampleb92221|c7864/f22p0/462,i0_HQ_sampleb92221|c7959/f2p0/461,i0_HQ_sampleb92221|c8090/f3p0/462,i0_HQ_sampleb92221|c8099/f3p0/459,i0_HQ_sampleb92221|c8136/f2p0/461,i0_HQ_sampleb92221|c428/f2p0/459".split(',')) with GroupReader(GROUP_FN_2) as reader: records = [r for r in reader] self.assertEqual(len(records), 51) self.assertEqual(records[0], expected_r) out_fn = op.join(OUT_DIR, "test_GroupWriter.txt") with GroupWriter(out_fn) as writer: for r in records: writer.writeRecord(r) self.assertTrue(filecmp.cmp(out_fn, GROUP_FN_2))
def test_write(self): """Test ClusterSummary.write.""" obj = ClusterSummary() obj.num_consensus_isoforms = 97 obj.num_total_bases = 97 * 3945 outFN = op.join(self.testDir, "out/test_ClusterSummary.txt") stdoutFN = op.join(self.testDir, "stdout/test_ClusterSummary.txt") obj.write(outFN) self.assertTrue(filecmp.cmp(outFN, stdoutFN)) outFN = op.join(self.testDir, "out/test_ClusterSummary.json") stdoutFN = op.join(self.testDir, "stdout/test_ClusterSummary.json") obj.write(outFN) rm_version_string(outFN, outFN + "tmp1") rm_version_string(stdoutFN, outFN + "tmp2") _compare_reports(self, outFN, stdoutFN) #self.assertTrue(filecmp.cmp(outFN + "tmp1", outFN + "tmp2"))
def test_concatenate_sam(self): """Test concatenate_sam(in_sam_files, out_sam)""" in_sam_files = [op.join(_SIV_DIR_, f) for f in ["chunk0.sam", "chunk1.sam"]] out_sam = op.join(_OUT_DIR_, 'test concatenated.sam') expected_sam = op.join(_SIV_DIR_, "sorted-gmap-output.sam") concatenate_sam(in_sam_files, out_sam) self.assertTrue(op.exists(out_sam)) self.assertTrue(op.exists(expected_sam)) #self.assertTrue(filecmp.cmp(out_sam, expected_sam)) out = [l for l in open(out_sam, 'r') if not l.startswith('@PG')] exp = [l for l in open(expected_sam, 'r') if not l.startswith('@PG')] # test everything other than @PG are identical self.assertEqual(out, exp) # chunk01.sam and chunk02.sam has identical PG ID in their SAM headers # test concatenated @PG IDs are not conflicting pg_ids = [x[3:] for pg in [l for l in open(out_sam, 'r') if l.startswith('@PG')] for x in pg.split('\t') if x.startswith('ID:')] self.assertEqual(len(pg_ids), len(set(pg_ids))) self.assertEqual(len(pg_ids), 2)
def prepare_issfile(self): # Create temp file ofilehandle, ofilepath = tempfile.mkstemp() # open temporary file ofile = os.fdopen(ofilehandle, 'w') # wrap fhandle in "file object" ifilepath = self.getissfile() ifile = open(ifilepath) # open original file for line in ifile: line = self.replace_lines(line) ofile.write(line) ofile.close() # close temp file ifile.close() # close original file equal = filecmp.cmp(ifilepath, ofilepath, shallow=False) if not equal: os.remove(ifilepath) # remove original file shutil.move(ofilepath, ifilepath) # move new file else: os.remove(ofilepath) # remove temp file
def get_diff_list(left, right, ignore=['.DS_Store', 'pymakr.conf']): left_paths = get_all_paths(left, ignore=ignore) right_paths = get_all_paths(right, ignore=ignore) new_files = right_paths.difference(left_paths) to_delete = left_paths.difference(right_paths) common = left_paths.intersection(right_paths) to_update = [] for f in common: if not filecmp.cmp(os.path.join(left, f), os.path.join(right, f), shallow=False): to_update.append(f) return (to_delete, new_files, (to_update)) # Searches the current working directory for a file starting with "firmware_" # followed by a version number higher than `current_ver` as per LooseVersion. # Returns None if such a file does not exist. # Parameters # path - the path to the directory to be searched # current_ver - the result must be higher than this version #
def _check_file_equality(self, fn_1, fn_2, ext): # gz files contain mtime and filename in the header that # causes filecmp to return False even if contents are identical # Hence decompress to test for equality if(ext == '.gz'): with gzip.GzipFile(fn_1, 'rb') as f_1,\ NamedTemporaryFile(mode='wb') as f_txt_1,\ gzip.GzipFile(fn_2, 'rb') as f_2,\ NamedTemporaryFile(mode='wb') as f_txt_2: shutil.copyfileobj(f_1, f_txt_1) shutil.copyfileobj(f_2, f_txt_2) f_txt_1.flush() f_txt_2.flush() return filecmp.cmp(f_txt_1.name, f_txt_2.name, shallow=False) else: return filecmp.cmp(fn_1, fn_2, shallow=False)
def test_uncompress_file(self): # Testing txt file type self.assertRaisesRegexp(NotImplementedError, "^Received .txt format. Only gz and bz2.*", compression.uncompress_file, **{'input_file_name': None, 'file_extension': '.txt', 'dest_dir': None }) # Testing gz file type fn_txt = self._get_fn('.txt') fn_gz = self._get_fn('.gz') txt_gz = compression.uncompress_file(fn_gz, '.gz', self.tmp_dir) self.assertTrue(filecmp.cmp(txt_gz, fn_txt, shallow=False), msg="Uncompressed file doest match original") # Testing bz2 file type fn_bz2 = self._get_fn('.bz2') txt_bz2 = compression.uncompress_file(fn_bz2, '.bz2', self.tmp_dir) self.assertTrue(filecmp.cmp(txt_bz2, fn_txt, shallow=False), msg="Uncompressed file doest match original")
def compare_featurizer_class(featurizer, scaled_size, featurized_data, downsample_size, image_column_headers, automatic_downsample, csv_path, image_list, depth, featurized=False): """Check the necessary assertions for a featurizer image.""" assert featurizer.scaled_size == scaled_size assert np.allclose(featurizer.features, featurized_data, atol=ATOL) assert featurizer.downsample_size == downsample_size assert featurizer.image_column_headers == image_column_headers assert featurizer.auto_sample == automatic_downsample assert featurizer.csv_path == csv_path assert featurizer.image_list == image_list assert featurizer.depth == depth if featurized: assert filecmp.cmp('{}_full'.format(csv_path), CHECK_CSV.format(featurizer.model_name)) assert featurizer.full_dataframe == pd.read_csv(CHECK_CSV.format(featurizer.model_name))
def test_from_biotools_to_galaxy(self, name, json_path, xml_path): # Open json to be the content of the requests_mock json_answer = main.json_from_file(json_path) with requests_mock.mock() as m: m.get('https://bio.tools/api/tool/' + name + '/version/1.0',\ json=json_answer) json = main.json_from_biotools(name, '1.0') biotool = main.json_to_biotool(json) tmp_file = 'tmp_test_xml.xml' main.write_xml(biotool,tmp_file) tmp_file_list = glob("tmp_*.xml") try: for temp_file in tmp_file_list: if len(tmp_file_list) > 1: xml_path = os.path.splitext(json_path)[0] + \ str(re.findall('\d+', temp_file)[0]) + '.xml' self.assertTrue(filecmp.cmp(xml_path,temp_file)) finally: for temp_file in tmp_file_list: os.remove(temp_file)
def test_from_biotools_to_cwl(self, name, json_path, cwl_path): # Open json to be the content of the requests_mock json_answer = main.json_from_file(json_path) with requests_mock.mock() as m: m.get('https://bio.tools/api/tool/' + name + '/version/1.0',\ json=json_answer) json = main.json_from_biotools(name, '1.0') biotool = main.json_to_biotool(json) tmp_file = name + '_tmp_test_cwl.cwl' main.write_cwl(biotool,tmp_file) tmp_file_list = glob(name + "_tmp_*.cwl") print (tmp_file_list) try: for temp_file in tmp_file_list: if len(tmp_file_list) > 1: cwl_path = os.path.splitext(json_path)[0] + \ str(re.findall('\d+', temp_file)[0]) + '.cwl' self.assertTrue(filecmp.cmp(cwl_path,temp_file)) finally: pass for temp_file in tmp_file_list: os.remove(temp_file) ########### Main ###########
def _copy_file(self, src, dest, executable, bundle_root): """Copies a file into the bundle. Args: src: The path to the file or directory that should be added. dest: The path relative to the bundle root where the file should be stored. executable: A Boolean value indicating whether or not the file(s) should be made executable. bundle_root: The bundle root directory into which the files should be added. """ full_dest = os.path.join(bundle_root, dest) if (os.path.isfile(full_dest) and not filecmp.cmp(full_dest, src, shallow=False)): raise BundleConflictError(dest) self._makedirs_safely(os.path.dirname(full_dest)) shutil.copy(src, full_dest) os.chmod(full_dest, 0755 if executable else 0644)
def recursive_overwrite(src, dest, ignore=None): if os.path.isdir(src): if not os.path.isdir(dest): os.makedirs(dest, exist_ok=True) files = os.listdir(src) if ignore is not None: ignored = ignore(src, files) else: ignored = set() for f in files: if f not in ignored: recursive_overwrite(os.path.join(src, f), os.path.join(dest, f), ignore) else: if not os.path.exists(dest) or not filecmp.cmp(src, dest): print('copy {}'.format(src)) shutil.copyfile(src, dest) shutil.copystat(src, dest)
def test_import_file_to_dir(self): with tempfile.NamedTemporaryFile(mode='w') as temp: content = 'Some data' temp.write(content) temp.flush() dir = os.path.join('data', self.dir1) settings = copy(self.settings) settings.parse_args('import {} {}'.format(temp.name, dir)) cmd = CmdImportFile(settings) cmd.run() dvc_name = os.path.join(dir, os.path.basename(temp.name)) data_item = self.path_factory.existing_data_item(dvc_name) self.assertTrue(os.path.exists(data_item.data.relative)) self.assertTrue(filecmp.cmp(data_item.data.relative, data_item.cache.relative)) self.assertEqual(open(data_item.data.relative).read(), content) self.assertTrue(os.path.exists(data_item.cache.relative)) self.assertEqual(open(data_item.cache.relative).read(), content) self.assertTrue(os.path.exists(data_item.state.relative)) pass
def test_copyfile(self): src = 'file1' dest = 'file2' dest_dir = 'testdir' with open(src, 'w+') as f: f.write('file1contents') os.mkdir(dest_dir) utils.copyfile(src, dest) self.assertTrue(filecmp.cmp(src, dest)) utils.copyfile(src, dest_dir) self.assertTrue(filecmp.cmp(src, '{}/{}'.format(dest_dir, src))) shutil.rmtree(dest_dir) os.remove(src) os.remove(dest)
def test_restore_disk_in_domain(self, get_uncompressed_complete_backup, build_stopped_mock_domain, tmpdir): backup = get_uncompressed_complete_backup domain = build_stopped_mock_domain src_img = backup.get_complete_path_of(backup.disks["vda"]) domain.set_storage_basedir(str(tmpdir)) dst_img = get_domain_disks_of(domain.XMLDesc(), "vda")["vda"]["src"] backup.restore_and_replace_disk_of("vda", domain, "vda") assert filecmp.cmp(src_img, dst_img) assert ( get_domain_disks_of(domain.XMLDesc())["vda"]["type"] == get_domain_disks_of(backup.dom_xml)["vda"]["type"] )
def test_fetch(self): # Create temp file to copy file = open(os.path.join(self._copyFile.copy_from_prefix, "%sbigFileXYZA" % tempfile.gettempprefix()), 'w') file.write("Lots of interesting stuff") file.close() # Perform copy interpolate_dict = {"project_identifier": tempfile.gettempprefix(), "product_identifier": "bigFile", "run_identifier": "XYZ"} result_paths = self._copyFile.fetch(interpolate_dict=interpolate_dict) # Test copied file exists and DataFile matches self.assertEqual(len(result_paths), 1) df = result_paths[0] self.assertEqual(df.file_name, "%sbigFileXYZB" % tempfile.gettempprefix()) self.assertEqual(df.location, os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix())) self.assertEqual(df.equipment, self._equipmentSequencer) self.assertIs(filecmp.cmp( os.path.join(tempfile.gettempdir(), "%sbigFileXYZA" % tempfile.gettempprefix()), os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix())), True) # Clean up os.remove(os.path.join(tempfile.gettempdir(), "%sbigFileXYZA" % tempfile.gettempprefix())) os.remove(os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix()))
def test_upload_success(repository_upload_fixture): fixture = repository_upload_fixture fixture.package.build() # Case where it works assert not fixture.debian_repository.is_uploaded(fixture.package) assert fixture.package.is_built fixture.debian_repository.upload(fixture.package, []) assert fixture.debian_repository.is_uploaded(fixture.package) assert fixture.package.files_to_distribute for filename in fixture.package.files_to_distribute: filename_only = os.path.basename(filename) incoming_filename = os.path.join(fixture.incoming_directory.name, filename_only) assert filecmp.cmp(filename, incoming_filename) # Case where you try upload something again assert fixture.debian_repository.is_uploaded(fixture.package) assert fixture.package.is_built with expected(AlreadyUploadedException): fixture.debian_repository.upload(fixture.package, [])
def test_create_less_simple_xls(self): wb, ws = self.create_simple_xls() more_content=[ [ u'A{0}'.format(i), u'Za?ó?? g??l? ja?? {0} {1}'.format(i, LOREM_IPSUM), ] for idx, i in enumerate(range(1000, 1050)) ] for r_idx, content_row in enumerate(more_content, 3): for c_idx, cell in enumerate(content_row): ws.write(r_idx, c_idx, cell) wb.save(in_tst_output_dir('less_simple.xls')) self.assertTrue(filecmp.cmp(in_tst_dir('less_simple.xls'), in_tst_output_dir('less_simple.xls'), shallow=False))
def compare(fname1, fname2, binary_mode=False): if binary_mode: return cmp(fname1, fname2, shallow=False) f1 = open(fname1, 'r') f2 = open(fname2, 'r') while True: line1 = f1.readline() line2 = f2.readline() # if one of files had already ended then for sure it's not equal if not line1 and line2 or line1 and not line2: return False # if both files ended at the same time then these are equal if not line1 and not line2: return True # if given line doesn't match then files are not equal if line1.strip() != line2.strip(): return False
def test_open_foam_write_comparison(self): open_foam_handler = ofh.OpenFoamHandler() mesh_points = open_foam_handler.parse( 'tests/test_datasets/test_openFOAM' ) mesh_points[0] = [-14., 1.55, 0.2] mesh_points[1] = [-14.3, 2.55, 0.3] mesh_points[2] = [-14.3, 2.55, 0.3] mesh_points[2000] = [7.8, -42.8, .0] mesh_points[2001] = [8.8, -41.8, .1] mesh_points[2002] = [9.8, -40.8, .0] mesh_points[-3] = [236.3, 183.7, 0.06] mesh_points[-2] = [237.3, 183.7, 0.06] mesh_points[-1] = [236.3, 185.7, 0.06] outfilename = 'tests/test_datasets/test_openFOAM_out' outfilename_expected = 'tests/test_datasets/test_openFOAM_out_true' open_foam_handler.write(mesh_points, outfilename) self.assertTrue(filecmp.cmp(outfilename, outfilename_expected)) self.addCleanup(os.remove, outfilename)
def test_write_parameters_filename_default(self): params = rbfp.RBFParameters() params.basis = 'gaussian_spline' params.radius = 0.5 params.n_control_points = 8 params.power = 2 params.original_control_points = np.array([0., 0., 0., 0., 0., 1., 0., 1., 0., 1., 0., 0., \ 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1., 1.]).reshape((8, 3)) params.deformed_control_points = np.array([0., 0., 0., 0., 0., 1., 0., 1., 0., 1., 0., 0., \ 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1., 1.]).reshape((8, 3)) outfilename = 'test.prm' params.write_parameters(outfilename) outfilename_expected = 'tests/test_datasets/parameters_rbf_default.prm' print(filecmp.cmp(outfilename, outfilename_expected)) self.assertTrue(filecmp.cmp(outfilename, outfilename_expected)) os.remove(outfilename)
def test_unv_write_comparison_2(self): unv_handler = uh.UnvHandler() mesh_points = unv_handler.parse('tests/test_datasets/test_square.unv') mesh_points[0][0] = 2.2 mesh_points[5][1] = 4.3 mesh_points[9][2] = 0.5 mesh_points[45][0] = 7.2 mesh_points[132][1] = -1.2 mesh_points[255][2] = -3.6 outfilename = 'tests/test_datasets/test_square_out.unv' outfilename_expected = 'tests/test_datasets/test_square_out_true.unv' unv_handler.write(mesh_points, outfilename) self.assertTrue(filecmp.cmp(outfilename, outfilename_expected)) self.addCleanup(os.remove, outfilename)
def test_stl_write_comparison(self): stl_handler = sh.StlHandler() mesh_points = stl_handler.parse('tests/test_datasets/test_sphere.stl') mesh_points[0] = [-40.2, -20.5, 60.9] mesh_points[1] = [-40.2, -10.5, 60.9] mesh_points[2] = [-40.2, -10.5, 60.9] mesh_points[500] = [-40.2, -20.5, 60.9] mesh_points[501] = [-40.2, -10.5, 60.9] mesh_points[502] = [-40.2, -10.5, 60.9] mesh_points[1000] = [-40.2, -20.5, 60.9] mesh_points[1001] = [-40.2, -10.5, 60.9] mesh_points[1002] = [-40.2, -10.5, 60.9] outfilename = 'tests/test_datasets/test_sphere_out.stl' outfilename_expected = 'tests/test_datasets/test_sphere_out_true.stl' stl_handler.write(mesh_points, outfilename) self.assertTrue(filecmp.cmp(outfilename, outfilename_expected)) self.addCleanup(os.remove, outfilename)
def test_stl_write_ascii_from_binary(self): stl_handler = sh.StlHandler() mesh_points = stl_handler.parse( 'tests/test_datasets/test_sphere_bin.stl' ) mesh_points[0] = [-40.2, -20.5, 60.9] mesh_points[1] = [-40.2, -10.5, 60.9] mesh_points[2] = [-40.2, -10.5, 60.9] mesh_points[500] = [-40.2, -20.5, 60.9] mesh_points[501] = [-40.2, -10.5, 60.9] mesh_points[502] = [-40.2, -10.5, 60.9] mesh_points[1000] = [-40.2, -20.5, 60.9] mesh_points[1001] = [-40.2, -10.5, 60.9] mesh_points[1002] = [-40.2, -10.5, 60.9] outfilename = 'tests/test_datasets/test_sphere_out.stl' outfilename_expected = 'tests/test_datasets/test_sphere_out_true.stl' stl_handler.write(mesh_points, outfilename, write_bin=False) self.assertTrue(filecmp.cmp(outfilename, outfilename_expected)) self.addCleanup(os.remove, outfilename)
def test_get_file(): """ Tests retrieving file from localfile storage. """ setup() with file('examples/telco/resources/cave.png','rb') as infile: stored_filename = media.set_file(infile) assert stored_filename == 'cave.png' retrieved_file = media.get_file('cave.png') assert filecmp.cmp(retrieved_file.name,'examples/telco/resources/cave.png')
def test_get_file(): """ Tests retrieving file from localfile storage. """ setup() with file('examples/telco/resources/cave.png','rb') as infile: stored_filename = localfile.set_file(infile) assert stored_filename == 'cave.png' retrieved_file = localfile.get_file('cave.png') assert filecmp.cmp(retrieved_file.name,'examples/telco/resources/cave.png')
def test_simple_local_copy(tmpdir): src = tmpdir.join('alpha') dst = tmpdir.join('beta') src.write('some data') assert src.check() assert not dst.check() copy(Resource('file://' + src.strpath), Resource('file://' + dst.strpath)) assert src.check() assert dst.check() assert filecmp.cmp(src.strpath, dst.strpath)
def test_simple_local_copy_with_callback(tmpdir): def wrapper(size): nonlocal count count += 1 count = 0 src = tmpdir.join('alpha') dst = tmpdir.join('beta') data = b'some data' src.write(data) chunk_size = 1 assert src.check() assert not dst.check() copy(Resource('file://' + src.strpath), Resource('file://' + dst.strpath,), size=chunk_size, callback_freq=1, callback=wrapper) assert src.check() assert dst.check() assert filecmp.cmp(src.strpath, dst.strpath) assert count == estimate_nb_cycles(len(data), chunk_size) dst.remove() count = 0 chunk_size = 2 assert src.check() assert not dst.check() copy(Resource('file://' + src.strpath), Resource('file://' + dst.strpath,), size=chunk_size, callback_freq=1, callback=wrapper) assert src.check() assert dst.check() assert filecmp.cmp(src.strpath, dst.strpath) assert count == estimate_nb_cycles(len(data), chunk_size)
def build(): """Build and apply a user configuration.""" env = _get_vars("/etc/environment") env.validate(full=True) os.chdir(env.net_config) compose(env) new_config = os.path.join(env.net_config, FILE_NAME) run_config = os.path.join(env.freeradius_repo, PYTHON_MODS, FILE_NAME) diff = filecmp.cmp(new_config, run_config) if not diff: print('change detected') shutil.copyfile(run_config, run_config + ".prev") shutil.copyfile(new_config, run_config) u = pwd.getpwnam("radiusd") os.chown(run_config, u.pw_uid, u.pw_gid) update_wiki(env, run_config) hashed = get_file_hash(FILE_NAME) git = "latest commit" git_indicator = env.working_dir + "git" if os.path.exists(git_indicator): with open(git_indicator, 'r') as f: git = f.read().strip() status = "ready" _smirc("{} -> {} ({})".format(status, git, hashed)) _feed(env, "radius configuration updated") daily_report(env, run_config)
def test_main(self): self.fake_popen.return_value.stdout = BytesIO(get_base64_image()) args = ['snapshot', HTML_FILE] with patch.object(sys, 'argv', args): main() assert(filecmp.cmp('output.png', get_fixture('sample.png')))