Python filecmp 模块,cmp() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用filecmp.cmp()

项目:PlasoScaffolder    作者:ClaudiaSaxer    | 项目源码 | 文件源码
def testCopyFile(self):
    """Tests if the copying of a file none existing beforhand works."""
    expected_content = "this is test content."

    with tempfile.TemporaryDirectory() as tmpdir:
      source = os.path.join(tmpdir, self.file)
      destination = os.path.join(tmpdir, "copy", self.file)

      with open(source, "a") as f:
        f.write(expected_content)

      handler = file_handler.FileHandler()
      self.assertFalse(os.path.exists(destination))
      handler.CopyFile(source, destination)
      self.assertTrue(os.path.exists(destination))
      self.assertTrue(filecmp.cmp(destination, source))
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def true_duplicates(files):
    """
    Compare the given files, breaking them down into groups with identical
    content.
    """
    while len(files) > 1:
        next_set = []
        this_set = []
        master = files[0]
        this_set.append(master)
        for other in files[1:]:
            if filecmp.cmp(master, other, False):
                this_set.append(other)
            else:
                next_set.append(other)
        if len(this_set) > 1:
            yield this_set
        files = next_set
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_file_decrypt_required_encryption_context_success(tmpdir, required_encryption_context):
    plaintext = tmpdir.join('source_plaintext')
    ciphertext = tmpdir.join('ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    ) + ' --encryption-context ' + required_encryption_context

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_file_cycle(tmpdir):
    plaintext = tmpdir.join('source_plaintext')
    ciphertext = tmpdir.join('ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_file_cycle_target_through_symlink(tmpdir):
    plaintext = tmpdir.join('source_plaintext')
    output_dir = tmpdir.mkdir('output')
    os.symlink(str(output_dir), str(tmpdir.join('output_link')))
    ciphertext = tmpdir.join('output_link', 'ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_file_cycle_with_caching(tmpdir):
    plaintext = tmpdir.join('source_plaintext')
    ciphertext = tmpdir.join('ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template(caching=True).format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_dir_cycle(tmpdir):
    inner_dir = tmpdir.mkdir('inner')
    plaintext = tmpdir.join('source_plaintext')
    ciphertext = inner_dir.join('source_plaintext.encrypted')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(inner_dir)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    assert os.path.isfile(str(ciphertext))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def validate(self,pid):

        """

        This function compares the output file with the key

        Args :

            pid : The id of the problem to map to the correct key.

        Returns :

            WA : Wrong Answer , if comparison fails.
            AC : Accepted , if comparison succeeds.

        """

        if filecmp.cmp(self.cwd+"/tmp/output.txt",self.cwd+"/env/key/key"+str(pid)+".txt")==True:
                    return "AC"
        else:
                    return "WA"
项目:betaPika    作者:alchemistake    | 项目源码 | 文件源码
def delete_identical_files():
    for battle_folder in os.listdir(download_path):
        battle_folder_path = os.path.join(os.curdir, download_folder_name, battle_folder)

        if not battle_folder.startswith('.') and os.path.isdir(battle_folder_path):
            battle_files = os.listdir(battle_folder_path)

            previous_battle_file = os.path.join(os.curdir, "main.py")
            for battle_file in battle_files:
                current_battle_file = os.path.join(os.curdir, download_folder_name, battle_folder, battle_file)
                if filecmp.cmp(previous_battle_file, current_battle_file, shallow=0):
                    os.remove(current_battle_file)
                    print current_battle_file, "Deleted, copy of another file"
                else:
                    previous_battle_file = current_battle_file
    print "Identical file search complete."
项目:cluster-genesis    作者:open-power-ref-design-toolkit    | 项目源码 | 文件源码
def compare_files(file1, file2, log):
    """Compare two files

    Use Python's filecmp module to compare two files and log/print
    results.

    Args:
        file1 (string): Path of first file to compare
        file2 (string): Path of second file to compare
        log (:obj:`Logger`): Log file object.

    Returns:
        boolean: True if they seem equal, False otherwise
    """
    if filecmp.cmp(file1, file2):
        msg = ("Two MAC Address Table Files Are Identical! '%s' & '%s'"
               % (file1, file2))
        log.error(msg)
        print("Error: " + msg)
        return True
    else:
        return False
项目:mx    作者:graalvm    | 项目源码 | 文件源码
def _sorted_unique_jdk_configs(configs):
    path_seen = set()
    unique_configs = [c for c in configs if c.home not in path_seen and not path_seen.add(c.home)]

    def _compare_configs(c1, c2):
        if c1 == _default_java_home:
            if c2 != _default_java_home:
                return 1
        elif c2 == _default_java_home:
            return -1
        if c1 in _extra_java_homes:
            if c2 not in _extra_java_homes:
                return 1
        elif c2 in _extra_java_homes:
            return -1
        return VersionSpec.__cmp__(c1.version, c2.version)
    return sorted(unique_configs, cmp=_compare_configs, reverse=True)
项目:AutoMergeTool    作者:xgouchet    | 项目源码 | 文件源码
def test_single_conflict_unsolved(self):
        """Tests a walker against a file with a single conflict, without solving it"""

        # Given a file to merge
        file = CW_PATH.format('single_conflict')
        walker = ConflictsWalker(file, 'test', REPORT_NONE, False)

        # When walking the conflicts
        self.assertTrue(walker.has_more_conflicts())
        self.assertFalse(walker.has_more_conflicts())
        walker.end(False)

        # Then check the output
        self.assertTrue(filecmp.cmp(walker.merged, file))
        self.assertEqual(walker.get_merge_status(), ERROR_CONFLICTS)
        os.remove(walker.merged)
项目:AutoMergeTool    作者:xgouchet    | 项目源码 | 文件源码
def test_single_conflict_rewritten(self):
        """Tests a walker against a file with a single conflict, without solving it"""

        # Given a file to merge
        file = CW_PATH.format('single_conflict')
        walker = ConflictsWalker(file, 'test', REPORT_NONE, False)

        # When walking the conflicts
        self.assertTrue(walker.has_more_conflicts())
        conflict = walker.next_conflict()
        conflict.rewrite(RESOLUTION)
        self.assertFalse(walker.has_more_conflicts())
        walker.end(False)

        # Then check the output
        self.assertTrue(filecmp.cmp(walker.merged, CW_PATH.format('single_conflict_resolved')))
        self.assertEqual(walker.get_merge_status(), ERROR_CONFLICTS)
        os.remove(walker.merged)
项目:AutoMergeTool    作者:xgouchet    | 项目源码 | 文件源码
def test_single_conflict_solved(self):
        """Tests a walker against a file with a single conflict, and solving it"""

        # Given a file to merge
        file = CW_PATH.format('single_conflict')
        walker = ConflictsWalker(file, 'test', REPORT_NONE, False)

        # When walking the conflicts
        self.assertTrue(walker.has_more_conflicts())
        conflict = walker.next_conflict()
        conflict.resolve(RESOLUTION)
        self.assertFalse(walker.has_more_conflicts())
        walker.end(False)

        # Then check the output
        self.assertTrue(filecmp.cmp(walker.merged, CW_PATH.format('single_conflict_resolved')))
        self.assertEqual(walker.get_merge_status(), SUCCESS)
        os.remove(walker.merged)
项目:pymongo-schema    作者:pajachiet    | 项目源码 | 文件源码
def test02_transform():
    base_output = "output_fctl_data_dict"
    outputs = {}
    extensions = ['html', 'xlsx', 'tsv', 'md']
    for ext in extensions:
        outputs[ext] = "{}.{}".format(base_output, ext)

    exp = os.path.join(TEST_DIR, 'resources', 'expected', 'data_dict')
    argv = ['transform', SCHEMA_FILE, '--output', base_output, '--columns',
            'Field_compact_name', 'Field_name', 'Full_name', 'Description', 'Count', 'Percentage',
            'Types_count',
            '--formats'] + extensions
    main(argv)

    assert filecmp.cmp(outputs['tsv'], "{}.tsv".format(exp))
    assert filecmp.cmp(outputs['md'], "{}.md".format(exp))
    with open(outputs['html']) as out_fd, \
            open("{}.html".format(exp)) as exp_fd:
        assert out_fd.read().replace(' ', '') == exp_fd.read().replace(' ', '')
    res = [cell.value for row in load_workbook(outputs['xlsx']).active for cell in row]
    exp = [cell.value for row in load_workbook("{}.xlsx".format(exp)).active for cell in row]
    assert res == exp
    for output in outputs.values():
        os.remove(output)
项目:pymongo-schema    作者:pajachiet    | 项目源码 | 文件源码
def test04_transform_default_cols():
    base_output = "output_fctl_data_dict_default"
    outputs = {}
    extensions = ['html', 'xlsx', 'tsv', 'md']
    for ext in extensions:
        outputs[ext] = "{}.{}".format(base_output, ext)

    exp = os.path.join(TEST_DIR, 'resources', 'expected', 'data_dict_default')
    argv = ['transform', SCHEMA_FILE, '--output', base_output, '--formats'] + extensions
    main(argv)

    assert filecmp.cmp(outputs['tsv'], "{}.tsv".format(exp))
    assert filecmp.cmp(outputs['md'], "{}.md".format(exp))
    with open(outputs['html']) as out_fd, \
            open("{}.html".format(exp)) as exp_fd:
        assert out_fd.read().replace(' ', '') == exp_fd.read().replace(' ', '')
    res = [cell.value for row in load_workbook(outputs['xlsx']).active for cell in row]
    exp = [cell.value for row in load_workbook("{}.xlsx".format(exp)).active for cell in row]
    assert res == exp
    for output in outputs.values():
        os.remove(output)
项目:pymongo-schema    作者:pajachiet    | 项目源码 | 文件源码
def test06_compare():
    base_output = "output_fctl_diff"
    outputs = {}
    extensions = ['html', 'xlsx', 'tsv', 'md']
    for ext in extensions:
        outputs[ext] = "{}.{}".format(base_output, ext)

    exp = os.path.join(TEST_DIR, 'resources', 'functional', 'expected', 'diff')
    exp_schema = os.path.join(TEST_DIR, 'resources', 'input', 'test_schema2.json')
    argv = ['compare', SCHEMA_FILE, exp_schema, '--output', base_output, '--formats'] + extensions
    main(argv)

    assert filecmp.cmp(outputs['tsv'], "{}.tsv".format(exp))
    assert filecmp.cmp(outputs['md'], "{}.md".format(exp))
    with open(outputs['html']) as out_fd, \
            open("{}.html".format(exp)) as exp_fd:
        assert out_fd.read().replace(' ', '') == exp_fd.read().replace(' ', '')
    res = [cell.value for row in load_workbook(outputs['xlsx']).active for cell in row]
    exp = [cell.value for row in load_workbook("{}.xlsx".format(exp)).active for cell in row]
    assert res == exp
    for output in outputs.values():
        os.remove(output)
项目:pymongo-schema    作者:pajachiet    | 项目源码 | 文件源码
def test07_compare_detailed():
    base_output = "output_fctl_detailed_diff"
    outputs = {}
    extensions = ['html', 'xlsx', 'tsv', 'md']
    for ext in extensions:
        outputs[ext] = "{}.{}".format(base_output, ext)

    exp = os.path.join(TEST_DIR, 'resources', 'functional', 'expected', 'detailed_diff')
    exp_schema = os.path.join(TEST_DIR, 'resources', 'input', 'test_schema2.json')
    argv = ['compare', SCHEMA_FILE, exp_schema, '--output', base_output, '--detailed_diff',
            '--formats'] + extensions
    main(argv)

    assert filecmp.cmp(outputs['tsv'], "{}.tsv".format(exp))
    assert filecmp.cmp(outputs['md'], "{}.md".format(exp))
    with open(outputs['html']) as out_fd, \
            open("{}.html".format(exp)) as exp_fd:
        assert out_fd.read().replace(' ', '') == exp_fd.read().replace(' ', '')
    res = [cell.value for row in load_workbook(outputs['xlsx']).active for cell in row]
    exp = [cell.value for row in load_workbook("{}.xlsx".format(exp)).active for cell in row]
    assert res == exp
    for output in outputs.values():
        os.remove(output)
项目:pbtranscript    作者:PacificBiosciences    | 项目源码 | 文件源码
def test_all(self):
        """Test All"""
        expected_r = GroupRecord(name="group1",
                                 members=["member0", "member1", "member2"])
        with GroupReader(GROUP_FN_1) as reader:
            records = [r for r in reader]
            self.assertEqual(len(records), 1)
            self.assertEqual(records[0], expected_r)

        expected_r = GroupRecord(name="PB.1.1",
                                 members="i0_HQ_sampleb92221|c8319/f2p0/463,i0_HQ_sampleb92221|c28/f4p0/460,i0_HQ_sampleb92221|c524/f2p0/462,i0_HQ_sampleb92221|c539/f2p0/460,i0_HQ_sampleb92221|c7864/f22p0/462,i0_HQ_sampleb92221|c7959/f2p0/461,i0_HQ_sampleb92221|c8090/f3p0/462,i0_HQ_sampleb92221|c8099/f3p0/459,i0_HQ_sampleb92221|c8136/f2p0/461,i0_HQ_sampleb92221|c428/f2p0/459".split(','))
        with GroupReader(GROUP_FN_2) as reader:
            records = [r for r in reader]
            self.assertEqual(len(records), 51)
            self.assertEqual(records[0], expected_r)
            out_fn = op.join(OUT_DIR, "test_GroupWriter.txt")
            with GroupWriter(out_fn) as writer:
                for r in records:
                    writer.writeRecord(r)

            self.assertTrue(filecmp.cmp(out_fn, GROUP_FN_2))
项目:pbtranscript    作者:PacificBiosciences    | 项目源码 | 文件源码
def test_write(self):
        """Test ClusterSummary.write."""
        obj = ClusterSummary()
        obj.num_consensus_isoforms = 97
        obj.num_total_bases = 97 * 3945

        outFN = op.join(self.testDir, "out/test_ClusterSummary.txt")
        stdoutFN = op.join(self.testDir, "stdout/test_ClusterSummary.txt")
        obj.write(outFN)
        self.assertTrue(filecmp.cmp(outFN, stdoutFN))

        outFN = op.join(self.testDir, "out/test_ClusterSummary.json")
        stdoutFN = op.join(self.testDir, "stdout/test_ClusterSummary.json")
        obj.write(outFN)

        rm_version_string(outFN, outFN + "tmp1")
        rm_version_string(stdoutFN, outFN + "tmp2")
        _compare_reports(self, outFN, stdoutFN)
        #self.assertTrue(filecmp.cmp(outFN + "tmp1", outFN + "tmp2"))
项目:pbtranscript    作者:PacificBiosciences    | 项目源码 | 文件源码
def test_concatenate_sam(self):
        """Test concatenate_sam(in_sam_files, out_sam)"""
        in_sam_files = [op.join(_SIV_DIR_, f)
                        for f in ["chunk0.sam", "chunk1.sam"]]
        out_sam = op.join(_OUT_DIR_, 'test concatenated.sam')
        expected_sam = op.join(_SIV_DIR_, "sorted-gmap-output.sam")
        concatenate_sam(in_sam_files, out_sam)

        self.assertTrue(op.exists(out_sam))
        self.assertTrue(op.exists(expected_sam))

        #self.assertTrue(filecmp.cmp(out_sam, expected_sam))
        out = [l for l in open(out_sam, 'r') if not l.startswith('@PG')]
        exp = [l for l in open(expected_sam, 'r') if not l.startswith('@PG')]
        # test everything other than @PG are identical
        self.assertEqual(out, exp)

        # chunk01.sam and chunk02.sam has identical PG ID in their SAM headers
        # test concatenated @PG IDs are not conflicting
        pg_ids = [x[3:] for pg in [l for l in open(out_sam, 'r') if l.startswith('@PG')]
                  for x in pg.split('\t') if x.startswith('ID:')]
        self.assertEqual(len(pg_ids), len(set(pg_ids)))
        self.assertEqual(len(pg_ids), 2)
项目:fritzchecksum    作者:mementum    | 项目源码 | 文件源码
def prepare_issfile(self):
        # Create temp file
        ofilehandle, ofilepath = tempfile.mkstemp()  # open temporary file
        ofile = os.fdopen(ofilehandle, 'w')  # wrap fhandle in "file object"

        ifilepath = self.getissfile()
        ifile = open(ifilepath)  # open original file
        for line in ifile:
            line = self.replace_lines(line)
            ofile.write(line)

        ofile.close()  # close temp file
        ifile.close()  # close original file

        equal = filecmp.cmp(ifilepath, ofilepath, shallow=False)
        if not equal:
            os.remove(ifilepath)  # remove original file
            shutil.move(ofilepath, ifilepath)  # move new file
        else:
            os.remove(ofilepath)  # remove temp file
项目:pycom-libraries    作者:pycom    | 项目源码 | 文件源码
def get_diff_list(left, right, ignore=['.DS_Store', 'pymakr.conf']):
    left_paths = get_all_paths(left, ignore=ignore)
    right_paths = get_all_paths(right, ignore=ignore)
    new_files = right_paths.difference(left_paths)
    to_delete = left_paths.difference(right_paths)
    common = left_paths.intersection(right_paths)

    to_update = []
    for f in common:
        if not filecmp.cmp(os.path.join(left, f),
                           os.path.join(right, f),
                           shallow=False):
            to_update.append(f)

    return (to_delete, new_files, (to_update))


# Searches the current working directory for a file starting with "firmware_"
# followed by a version number higher than `current_ver` as per LooseVersion.
# Returns None if such a file does not exist.
# Parameters
#    path - the path to the directory to be searched
#    current_ver - the result must be higher than this version
#
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def _check_file_equality(self, fn_1, fn_2, ext):
        # gz files contain mtime and filename in the header that
        # causes filecmp to return False even if contents are identical
        # Hence decompress to test for equality
        if(ext == '.gz'):
            with gzip.GzipFile(fn_1, 'rb') as f_1,\
                 NamedTemporaryFile(mode='wb') as f_txt_1,\
                 gzip.GzipFile(fn_2, 'rb') as f_2,\
                 NamedTemporaryFile(mode='wb') as f_txt_2:
                shutil.copyfileobj(f_1, f_txt_1)
                shutil.copyfileobj(f_2, f_txt_2)
                f_txt_1.flush()
                f_txt_2.flush()
                return filecmp.cmp(f_txt_1.name, f_txt_2.name, shallow=False)
        else:
            return filecmp.cmp(fn_1, fn_2, shallow=False)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_uncompress_file(self):
        # Testing txt file type
        self.assertRaisesRegexp(NotImplementedError,
                                "^Received .txt format. Only gz and bz2.*",
                                compression.uncompress_file,
                                **{'input_file_name': None,
                                   'file_extension': '.txt',
                                   'dest_dir': None
                                   })
        # Testing gz file type
        fn_txt = self._get_fn('.txt')
        fn_gz = self._get_fn('.gz')
        txt_gz = compression.uncompress_file(fn_gz, '.gz', self.tmp_dir)
        self.assertTrue(filecmp.cmp(txt_gz, fn_txt, shallow=False),
                        msg="Uncompressed file doest match original")
        # Testing bz2 file type
        fn_bz2 = self._get_fn('.bz2')
        txt_bz2 = compression.uncompress_file(fn_bz2, '.bz2', self.tmp_dir)
        self.assertTrue(filecmp.cmp(txt_bz2, fn_txt, shallow=False),
                        msg="Uncompressed file doest match original")
项目:pic2vec    作者:datarobot    | 项目源码 | 文件源码
def compare_featurizer_class(featurizer,
                             scaled_size,
                             featurized_data,
                             downsample_size,
                             image_column_headers,
                             automatic_downsample,
                             csv_path,
                             image_list,
                             depth,
                             featurized=False):
    """Check the necessary assertions for a featurizer image."""
    assert featurizer.scaled_size == scaled_size
    assert np.allclose(featurizer.features, featurized_data, atol=ATOL)
    assert featurizer.downsample_size == downsample_size
    assert featurizer.image_column_headers == image_column_headers
    assert featurizer.auto_sample == automatic_downsample
    assert featurizer.csv_path == csv_path
    assert featurizer.image_list == image_list
    assert featurizer.depth == depth
    if featurized:
        assert filecmp.cmp('{}_full'.format(csv_path), CHECK_CSV.format(featurizer.model_name))
        assert featurizer.full_dataframe == pd.read_csv(CHECK_CSV.format(featurizer.model_name))
项目:ToolDog    作者:bio-tools    | 项目源码 | 文件源码
def test_from_biotools_to_galaxy(self, name, json_path, xml_path):
        # Open json to be the content of the requests_mock
        json_answer = main.json_from_file(json_path)
        with requests_mock.mock() as m:
            m.get('https://bio.tools/api/tool/' + name + '/version/1.0',\
                  json=json_answer)
            json = main.json_from_biotools(name, '1.0')
            biotool = main.json_to_biotool(json)
            tmp_file = 'tmp_test_xml.xml'
            main.write_xml(biotool,tmp_file)
            tmp_file_list = glob("tmp_*.xml")
            try:
                for temp_file in tmp_file_list:
                    if len(tmp_file_list) > 1:
                        xml_path = os.path.splitext(json_path)[0] + \
                                   str(re.findall('\d+', temp_file)[0]) + '.xml' 
                    self.assertTrue(filecmp.cmp(xml_path,temp_file))
            finally:
                for temp_file in tmp_file_list:
                    os.remove(temp_file)
项目:ToolDog    作者:bio-tools    | 项目源码 | 文件源码
def test_from_biotools_to_cwl(self, name, json_path, cwl_path):
        # Open json to be the content of the requests_mock
        json_answer = main.json_from_file(json_path)
        with requests_mock.mock() as m:
            m.get('https://bio.tools/api/tool/' + name + '/version/1.0',\
                  json=json_answer)
            json = main.json_from_biotools(name, '1.0')
            biotool = main.json_to_biotool(json)
            tmp_file = name + '_tmp_test_cwl.cwl'
            main.write_cwl(biotool,tmp_file)
            tmp_file_list = glob(name + "_tmp_*.cwl")
            print (tmp_file_list)
            try:
                for temp_file in tmp_file_list:
                    if len(tmp_file_list) > 1:
                        cwl_path = os.path.splitext(json_path)[0] + \
                                   str(re.findall('\d+', temp_file)[0]) + '.cwl' 
                    self.assertTrue(filecmp.cmp(cwl_path,temp_file))
            finally:
                pass
                for temp_file in tmp_file_list:
                    os.remove(temp_file)


###########  Main  ###########
项目:rules_apple    作者:bazelbuild    | 项目源码 | 文件源码
def _copy_file(self, src, dest, executable, bundle_root):
    """Copies a file into the bundle.

    Args:
      src: The path to the file or directory that should be added.
      dest: The path relative to the bundle root where the file should be
          stored.
      executable: A Boolean value indicating whether or not the file(s) should
          be made executable.
      bundle_root: The bundle root directory into which the files should be
          added.
    """
    full_dest = os.path.join(bundle_root, dest)
    if (os.path.isfile(full_dest) and
        not filecmp.cmp(full_dest, src, shallow=False)):
      raise BundleConflictError(dest)

    self._makedirs_safely(os.path.dirname(full_dest))
    shutil.copy(src, full_dest)
    os.chmod(full_dest, 0755 if executable else 0644)
项目:aioweb    作者:kreopt    | 项目源码 | 文件源码
def recursive_overwrite(src, dest, ignore=None):
    if os.path.isdir(src):
        if not os.path.isdir(dest):
            os.makedirs(dest, exist_ok=True)
        files = os.listdir(src)
        if ignore is not None:
            ignored = ignore(src, files)
        else:
            ignored = set()
        for f in files:
            if f not in ignored:
                recursive_overwrite(os.path.join(src, f),
                                    os.path.join(dest, f),
                                    ignore)
    else:
        if not os.path.exists(dest) or not filecmp.cmp(src, dest):
            print('copy {}'.format(src))
            shutil.copyfile(src, dest)
            shutil.copystat(src, dest)
项目:dvc    作者:dataversioncontrol    | 项目源码 | 文件源码
def test_import_file_to_dir(self):
        with tempfile.NamedTemporaryFile(mode='w') as temp:
            content = 'Some data'
            temp.write(content)
            temp.flush()

            dir = os.path.join('data', self.dir1)
            settings = copy(self.settings)
            settings.parse_args('import {} {}'.format(temp.name, dir))
            cmd = CmdImportFile(settings)
            cmd.run()

            dvc_name = os.path.join(dir, os.path.basename(temp.name))
            data_item = self.path_factory.existing_data_item(dvc_name)

            self.assertTrue(os.path.exists(data_item.data.relative))
            self.assertTrue(filecmp.cmp(data_item.data.relative, data_item.cache.relative))
            self.assertEqual(open(data_item.data.relative).read(), content)

            self.assertTrue(os.path.exists(data_item.cache.relative))
            self.assertEqual(open(data_item.cache.relative).read(), content)

            self.assertTrue(os.path.exists(data_item.state.relative))
        pass
项目:dvc    作者:dataversioncontrol    | 项目源码 | 文件源码
def test_copyfile(self):
        src = 'file1'
        dest = 'file2'
        dest_dir = 'testdir'

        with open(src, 'w+') as f:
            f.write('file1contents')

        os.mkdir(dest_dir)

        utils.copyfile(src, dest)
        self.assertTrue(filecmp.cmp(src, dest))

        utils.copyfile(src, dest_dir)
        self.assertTrue(filecmp.cmp(src, '{}/{}'.format(dest_dir, src)))

        shutil.rmtree(dest_dir)
        os.remove(src)
        os.remove(dest)
项目:virt-backup    作者:Anthony25    | 项目源码 | 文件源码
def test_restore_disk_in_domain(self, get_uncompressed_complete_backup,
                                    build_stopped_mock_domain, tmpdir):
        backup = get_uncompressed_complete_backup
        domain = build_stopped_mock_domain

        src_img = backup.get_complete_path_of(backup.disks["vda"])
        domain.set_storage_basedir(str(tmpdir))
        dst_img = get_domain_disks_of(domain.XMLDesc(), "vda")["vda"]["src"]

        backup.restore_and_replace_disk_of("vda", domain, "vda")

        assert filecmp.cmp(src_img, dst_img)
        assert (
            get_domain_disks_of(domain.XMLDesc())["vda"]["type"] ==
            get_domain_disks_of(backup.dom_xml)["vda"]["type"]
        )
项目:LIMS-Backend    作者:LeafLIMS    | 项目源码 | 文件源码
def test_fetch(self):
        # Create temp file to copy
        file = open(os.path.join(self._copyFile.copy_from_prefix,
                                 "%sbigFileXYZA" % tempfile.gettempprefix()), 'w')
        file.write("Lots of interesting stuff")
        file.close()
        # Perform copy
        interpolate_dict = {"project_identifier": tempfile.gettempprefix(),
                            "product_identifier": "bigFile",
                            "run_identifier": "XYZ"}
        result_paths = self._copyFile.fetch(interpolate_dict=interpolate_dict)
        # Test copied file exists and DataFile matches
        self.assertEqual(len(result_paths), 1)
        df = result_paths[0]
        self.assertEqual(df.file_name, "%sbigFileXYZB" % tempfile.gettempprefix())
        self.assertEqual(df.location, os.path.join(tempfile.gettempdir(),
                                                   "%sbigFileXYZB" % tempfile.gettempprefix()))
        self.assertEqual(df.equipment, self._equipmentSequencer)
        self.assertIs(filecmp.cmp(
            os.path.join(tempfile.gettempdir(), "%sbigFileXYZA" % tempfile.gettempprefix()),
            os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix())), True)
        # Clean up
        os.remove(os.path.join(tempfile.gettempdir(), "%sbigFileXYZA" % tempfile.gettempprefix()))
        os.remove(os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix()))
项目:reahl    作者:reahl    | 项目源码 | 文件源码
def test_upload_success(repository_upload_fixture):
    fixture = repository_upload_fixture
    fixture.package.build()

    # Case where it works
    assert not fixture.debian_repository.is_uploaded(fixture.package)
    assert fixture.package.is_built

    fixture.debian_repository.upload(fixture.package, [])
    assert fixture.debian_repository.is_uploaded(fixture.package)
    assert fixture.package.files_to_distribute
    for filename in fixture.package.files_to_distribute:
        filename_only = os.path.basename(filename)
        incoming_filename = os.path.join(fixture.incoming_directory.name, filename_only)
        assert filecmp.cmp(filename, incoming_filename)

    # Case where you try upload something again
    assert fixture.debian_repository.is_uploaded(fixture.package)
    assert fixture.package.is_built
    with expected(AlreadyUploadedException):
        fixture.debian_repository.upload(fixture.package, [])
项目:InternationalizationScript-iOS    作者:alexfeng    | 项目源码 | 文件源码
def test_create_less_simple_xls(self):
        wb, ws = self.create_simple_xls()
        more_content=[
            [
                u'A{0}'.format(i),
                u'Za?ó?? g??l? ja?? {0} {1}'.format(i, LOREM_IPSUM),
            ]
            for idx, i in enumerate(range(1000, 1050))
        ]
        for r_idx, content_row in enumerate(more_content, 3):
            for c_idx, cell in enumerate(content_row):
                ws.write(r_idx, c_idx, cell)
        wb.save(in_tst_output_dir('less_simple.xls'))
        self.assertTrue(filecmp.cmp(in_tst_dir('less_simple.xls'),
                                    in_tst_output_dir('less_simple.xls'),
                                    shallow=False))
项目:InternationalizationScript-iOS    作者:alexfeng    | 项目源码 | 文件源码
def test_create_less_simple_xls(self):
        wb, ws = self.create_simple_xls()
        more_content=[
            [
                u'A{0}'.format(i),
                u'Za?ó?? g??l? ja?? {0} {1}'.format(i, LOREM_IPSUM),
            ]
            for idx, i in enumerate(range(1000, 1050))
        ]
        for r_idx, content_row in enumerate(more_content, 3):
            for c_idx, cell in enumerate(content_row):
                ws.write(r_idx, c_idx, cell)
        wb.save(in_tst_output_dir('less_simple.xls'))
        self.assertTrue(filecmp.cmp(in_tst_dir('less_simple.xls'),
                                    in_tst_output_dir('less_simple.xls'),
                                    shallow=False))
项目:algochecker-engine    作者:algochecker    | 项目源码 | 文件源码
def compare(fname1, fname2, binary_mode=False):
    if binary_mode:
        return cmp(fname1, fname2, shallow=False)

    f1 = open(fname1, 'r')
    f2 = open(fname2, 'r')

    while True:
        line1 = f1.readline()
        line2 = f2.readline()

        # if one of files had already ended then for sure it's not equal
        if not line1 and line2 or line1 and not line2:
            return False

        # if both files ended at the same time then these are equal
        if not line1 and not line2:
            return True

        # if given line doesn't match then files are not equal
        if line1.strip() != line2.strip():
            return False
项目:PyGeM    作者:mathLab    | 项目源码 | 文件源码
def test_open_foam_write_comparison(self):
        open_foam_handler = ofh.OpenFoamHandler()
        mesh_points = open_foam_handler.parse(
            'tests/test_datasets/test_openFOAM'
        )
        mesh_points[0] = [-14., 1.55, 0.2]
        mesh_points[1] = [-14.3, 2.55, 0.3]
        mesh_points[2] = [-14.3, 2.55, 0.3]
        mesh_points[2000] = [7.8, -42.8, .0]
        mesh_points[2001] = [8.8, -41.8, .1]
        mesh_points[2002] = [9.8, -40.8, .0]
        mesh_points[-3] = [236.3, 183.7, 0.06]
        mesh_points[-2] = [237.3, 183.7, 0.06]
        mesh_points[-1] = [236.3, 185.7, 0.06]

        outfilename = 'tests/test_datasets/test_openFOAM_out'
        outfilename_expected = 'tests/test_datasets/test_openFOAM_out_true'

        open_foam_handler.write(mesh_points, outfilename)
        self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
        self.addCleanup(os.remove, outfilename)
项目:PyGeM    作者:mathLab    | 项目源码 | 文件源码
def test_write_parameters_filename_default(self):
        params = rbfp.RBFParameters()
        params.basis = 'gaussian_spline'
        params.radius = 0.5
        params.n_control_points = 8
        params.power = 2
        params.original_control_points = np.array([0., 0., 0., 0., 0., 1., 0., 1., 0., 1., 0., 0., \
         0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1., 1.]).reshape((8, 3))
        params.deformed_control_points = np.array([0., 0., 0., 0., 0., 1., 0., 1., 0., 1., 0., 0., \
         0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1., 1.]).reshape((8, 3))
        outfilename = 'test.prm'
        params.write_parameters(outfilename)
        outfilename_expected = 'tests/test_datasets/parameters_rbf_default.prm'

        print(filecmp.cmp(outfilename, outfilename_expected))
        self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
        os.remove(outfilename)
项目:PyGeM    作者:mathLab    | 项目源码 | 文件源码
def test_unv_write_comparison_2(self):
        unv_handler = uh.UnvHandler()
        mesh_points = unv_handler.parse('tests/test_datasets/test_square.unv')

        mesh_points[0][0] = 2.2
        mesh_points[5][1] = 4.3
        mesh_points[9][2] = 0.5
        mesh_points[45][0] = 7.2
        mesh_points[132][1] = -1.2
        mesh_points[255][2] = -3.6

        outfilename = 'tests/test_datasets/test_square_out.unv'
        outfilename_expected = 'tests/test_datasets/test_square_out_true.unv'

        unv_handler.write(mesh_points, outfilename)
        self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
        self.addCleanup(os.remove, outfilename)
项目:PyGeM    作者:mathLab    | 项目源码 | 文件源码
def test_stl_write_comparison(self):
        stl_handler = sh.StlHandler()
        mesh_points = stl_handler.parse('tests/test_datasets/test_sphere.stl')
        mesh_points[0] = [-40.2, -20.5, 60.9]
        mesh_points[1] = [-40.2, -10.5, 60.9]
        mesh_points[2] = [-40.2, -10.5, 60.9]
        mesh_points[500] = [-40.2, -20.5, 60.9]
        mesh_points[501] = [-40.2, -10.5, 60.9]
        mesh_points[502] = [-40.2, -10.5, 60.9]
        mesh_points[1000] = [-40.2, -20.5, 60.9]
        mesh_points[1001] = [-40.2, -10.5, 60.9]
        mesh_points[1002] = [-40.2, -10.5, 60.9]

        outfilename = 'tests/test_datasets/test_sphere_out.stl'
        outfilename_expected = 'tests/test_datasets/test_sphere_out_true.stl'

        stl_handler.write(mesh_points, outfilename)
        self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
        self.addCleanup(os.remove, outfilename)
项目:PyGeM    作者:mathLab    | 项目源码 | 文件源码
def test_stl_write_ascii_from_binary(self):
        stl_handler = sh.StlHandler()
        mesh_points = stl_handler.parse(
            'tests/test_datasets/test_sphere_bin.stl'
        )
        mesh_points[0] = [-40.2, -20.5, 60.9]
        mesh_points[1] = [-40.2, -10.5, 60.9]
        mesh_points[2] = [-40.2, -10.5, 60.9]
        mesh_points[500] = [-40.2, -20.5, 60.9]
        mesh_points[501] = [-40.2, -10.5, 60.9]
        mesh_points[502] = [-40.2, -10.5, 60.9]
        mesh_points[1000] = [-40.2, -20.5, 60.9]
        mesh_points[1001] = [-40.2, -10.5, 60.9]
        mesh_points[1002] = [-40.2, -10.5, 60.9]

        outfilename = 'tests/test_datasets/test_sphere_out.stl'
        outfilename_expected = 'tests/test_datasets/test_sphere_out_true.stl'

        stl_handler.write(mesh_points, outfilename, write_bin=False)
        self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
        self.addCleanup(os.remove, outfilename)
项目:talkback    作者:Axilent    | 项目源码 | 文件源码
def test_get_file():
    """ 
    Tests retrieving file from localfile storage.
    """
    setup()
    with file('examples/telco/resources/cave.png','rb') as infile:
        stored_filename = media.set_file(infile)
        assert stored_filename == 'cave.png'

    retrieved_file = media.get_file('cave.png')
    assert filecmp.cmp(retrieved_file.name,'examples/telco/resources/cave.png')
项目:talkback    作者:Axilent    | 项目源码 | 文件源码
def test_get_file():
    """ 
    Tests retrieving file from localfile storage.
    """
    setup()
    with file('examples/telco/resources/cave.png','rb') as infile:
        stored_filename = localfile.set_file(infile)
        assert stored_filename == 'cave.png'

    retrieved_file = localfile.get_file('cave.png')
    assert filecmp.cmp(retrieved_file.name,'examples/telco/resources/cave.png')
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def test_simple_local_copy(tmpdir):
    src = tmpdir.join('alpha')
    dst = tmpdir.join('beta')

    src.write('some data')
    assert src.check()
    assert not dst.check()
    copy(Resource('file://' + src.strpath),
         Resource('file://' + dst.strpath))
    assert src.check()
    assert dst.check()
    assert filecmp.cmp(src.strpath, dst.strpath)
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def test_simple_local_copy_with_callback(tmpdir):
    def wrapper(size):
        nonlocal count
        count += 1
    count = 0
    src = tmpdir.join('alpha')
    dst = tmpdir.join('beta')
    data = b'some data'
    src.write(data)
    chunk_size = 1
    assert src.check()
    assert not dst.check()
    copy(Resource('file://' + src.strpath),
         Resource('file://' + dst.strpath,),
         size=chunk_size,
         callback_freq=1,
         callback=wrapper)
    assert src.check()
    assert dst.check()
    assert filecmp.cmp(src.strpath, dst.strpath)
    assert count == estimate_nb_cycles(len(data), chunk_size)
    dst.remove()
    count = 0
    chunk_size = 2
    assert src.check()
    assert not dst.check()
    copy(Resource('file://' + src.strpath),
         Resource('file://' + dst.strpath,),
         size=chunk_size,
         callback_freq=1,
         callback=wrapper)
    assert src.check()
    assert dst.check()
    assert filecmp.cmp(src.strpath, dst.strpath)
    assert count == estimate_nb_cycles(len(data), chunk_size)
项目:freeradius    作者:epiphyte    | 项目源码 | 文件源码
def build():
    """Build and apply a user configuration."""
    env = _get_vars("/etc/environment")
    env.validate(full=True)
    os.chdir(env.net_config)
    compose(env)
    new_config = os.path.join(env.net_config, FILE_NAME)
    run_config = os.path.join(env.freeradius_repo, PYTHON_MODS, FILE_NAME)
    diff = filecmp.cmp(new_config, run_config)
    if not diff:
        print('change detected')
        shutil.copyfile(run_config, run_config + ".prev")
        shutil.copyfile(new_config, run_config)
        u = pwd.getpwnam("radiusd")
        os.chown(run_config, u.pw_uid, u.pw_gid)
        update_wiki(env, run_config)
        hashed = get_file_hash(FILE_NAME)
        git = "latest commit"
        git_indicator = env.working_dir + "git"
        if os.path.exists(git_indicator):
            with open(git_indicator, 'r') as f:
                git = f.read().strip()
        status = "ready"
        _smirc("{} -> {} ({})".format(status, git, hashed))
        _feed(env, "radius configuration updated")
    daily_report(env, run_config)
项目:pyecharts-snapshot    作者:chfw    | 项目源码 | 文件源码
def test_main(self):
        self.fake_popen.return_value.stdout = BytesIO(get_base64_image())
        args = ['snapshot', HTML_FILE]
        with patch.object(sys, 'argv', args):
            main()
        assert(filecmp.cmp('output.png', get_fixture('sample.png')))