Python git.Repo 模块,init() 实例源码

我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用git.Repo.init()

项目:git-repo    作者:guyzmo    | 项目源码 | 文件源码
def do_clone(self, service=None, repo_path=None):
        service = service or self.get_service(lookup_repository=False)
        repo_path = repo_path or os.path.join(self.path, self.target_repo or self.repo_name)
        if os.path.exists(repo_path) and os.listdir(repo_path) != []:
            raise FileExistsError('Cannot clone repository, '
                                  'a folder named {} already exists and '
                                  'is not an empty directory!'.format(repo_path))
        try:
            repository = Repo.init(repo_path)
            service = RepositoryService.get_service(repository, self.target)
            service.clone(self.namespace, self.repo_name, self.branch)
            log.info('Successfully cloned `{}` into `{}`!'.format(
                service.format_path(self.repo_slug),
                repo_path)
            )
            return 0
        except Exception as err:
            if os.path.exists(repo_path):
                shutil.rmtree(repo_path)
            raise ResourceNotFoundError(err.args[2].decode('utf-8')) from err
项目:maintain    作者:kylef    | 项目源码 | 文件源码
def test_git_release_without_remote(self):
        with self.runner.isolated_filesystem():
            with open('VERSION', 'w') as fp:
                fp.write('1.0.0\n')

            repo = Repo.init()
            repo.index.add(['VERSION'])
            repo.index.commit('Initial commit')

            result = self.runner.invoke(release, ['2.0.0'])
            self.assertIsNone(result.exception)
            self.assertEqual(result.exit_code, 0)

            with open('VERSION') as fp:
                self.assertEqual(fp.read(), '2.0.0\n')

            self.assertEqual(repo.refs.master.commit.message, 'Release 2.0.0')
            self.assertEqual(repo.tags['2.0.0'].commit, repo.refs.master.commit)
            self.assertFalse(repo.is_dirty())
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_get_projects(self, tmpdir):
        """Verify that we can retrieve projects."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test1.yml'
        file.write_text(u"""---
tempest_git_repo: https://git.openstack.org/openstack/tempest
tempest_git_install_branch: 1493c7f0ba49bfccb9ff8516b10a65d949d7462e
tempest_git_project_group: utility_all
""", encoding='utf-8')
        file = p / 'test2.yml'
        file.write_text(u"""---
novncproxy_git_repo: https://github.com/kanaka/novnc
novncproxy_git_install_branch: da82b3426c27bf1a79f671c5825d68ab8c0c5d9f
novncproxy_git_project_group: nova_console
""", encoding='utf-8')
        repo.index.add(['test1.yml', 'test2.yml'])
        repo.index.commit("Test")
        projects = osa_differ.get_projects(path,
                                           'HEAD')
        assert isinstance(projects, list)
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_get_roles(self, tmpdir):
        """Verify that we can get OSA role information."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'ansible-role-requirements.yml'
        file.write_text(u"""
- name: apt_package_pinning
  scm: git
  src: https://github.com/openstack/openstack-ansible-apt_package_pinning
  version: master
""", encoding='utf-8')
        repo.index.add(['ansible-role-requirements.yml'])
        repo.index.commit("Test")

        roles = osa_differ.get_roles(path,
                                     'HEAD',
                                     'ansible-role-requirements.yml')
        assert isinstance(roles, list)
        assert roles[0][0] == 'apt_package_pinning'
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_commit_range_valid(self, tmpdir):
        """Verify that we can test a commit range for validity."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing1', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 1')
        file.write_text(u'Testing2', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 2')

        result = osa_differ.validate_commit_range(path, 'HEAD~1', 'HEAD')

        assert result
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_commit_range_flipped(self, tmpdir):
        """Verify that we can test a commit range for validity."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing1', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 1')
        file.write_text(u'Testing2', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 2')

        result = osa_differ.validate_commit_range(path, 'HEAD', 'HEAD~1')

        assert result == 'flip'
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_make_osa_report(self, tmpdir):
        """Verify that we can make the OSA header report."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing1', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 1')
        file.write_text(u'Testing2', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 2')

        parser = osa_differ.create_parser()
        args = parser.parse_args(['HEAD~1', 'HEAD'])
        report = osa_differ.make_osa_report(path,
                                            'HEAD~1',
                                            "HEAD",
                                            args)
        assert "HEAD~1" in report
        assert "OpenStack-Ansible Diff Generator" in report
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_make_report_old_pin_missing(self, tmpdir):
        """Verify that we can make a report when the old pin is missing."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing1', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 1')
        file.write_text(u'Testing2', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 2')

        new_pins = [("test", "http://example.com", "HEAD")]
        old_pins = []

        report = osa_differ.make_report(str(tmpdir), old_pins, new_pins)

        assert report == ''
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_repo_clone(self, tmpdir, monkeypatch):
        """Verify that we can clone a repo."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing')

        def mockclone(x, y):
            return path

        def mockwait(*args, **kwargs):
            return True

        monkeypatch.setattr("git.repo.base.Repo.clone", mockclone)
        monkeypatch.setattr("git.cmd.Git.AutoInterrupt.wait", mockwait)
        result = osa_differ.repo_clone(path,
                                       "http://example.com")

        assert result.active_branch.name == 'master'
        assert not result.is_dirty()
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_repo_clone_update(self, tmpdir):
        """Verify that we can clone a repo."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing')

        p = tmpdir.mkdir("test2")
        path_clonefrom = "{0}/testrepodoesntexist".format(str(p))
        result = osa_differ.update_repo(path_clonefrom, path)

        assert result.active_branch.name == 'master'
        assert not result.is_dirty()
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_repo_update_with_fetch(self, tmpdir, monkeypatch):
        """Verify that we can get a repo ready and update it."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing')
        repo.create_remote('origin', url='http://example.com')

        monkeypatch.setattr(
            "git.cmd.Git._call_process", lambda *args, **kwargs: True)
        result = osa_differ.repo_pull(path,
                                      "http://example.com",
                                      fetch=True)
        monkeypatch.undo()
        assert result.active_branch.name == 'master'
        assert not result.is_dirty()
项目:pomu    作者:Hummer12007    | 项目源码 | 文件源码
def init_plain_repo(create, repo_path):
    """
    Initialize a plain repository
    Parameters:
        create - if true, create a new git repo,
                 else, reuse an existing one
        repo_path - a path for the repository to reside in
    """
    if not repo_path:
        return Result.Err('repository path required')
    if create:
        if path.isdir(repo_path):
            return Result.Err('this repository already exists')
        try:
            makedirs(repo_path)
        except PermissionError:
            return Result.Err('you do not have enough permissions to create the git repository')
        Repo.init(repo_path)
        try:
            return Result.Ok(init_new(repo_path).unwrap())
        except ResultException as e:
            rmtree(repo_path)
            return Result.Err(str(e))
    else:
        if not path.isdir(repo_path):
            return Result.Err('directory not found')
        return init_pomu(repo_path)
项目:pomu    作者:Hummer12007    | 项目源码 | 文件源码
def init_portage_repo(create, repo, repo_dir):
    """
    Initialize a portage repository
    Parameters:
        create - if true, create a new portage repo with git,
                 else, reuse an existing one
        repo - name of the repository
        repo_dir - location of the newly created repository, if applicable
    """
    if not repo:
        return Result.Err('repository name required')
    rsets = portage.db[portage.root]['vartree'].settings.repositories
    if create:
        if repo in rsets.prepos_order:
            return Result.Err('a repository with such name already exists!')
        repo_path = path.join(repo_dir, repo)
        try:
            makedirs(repo_path)
        except PermissionError:
            return Result.Err('you do not have enough permissions to create the git repository')
        try:
            with open(path.join(portage.root, 'etc/portage/repos.conf', 'pomu.conf'), 'a') as f:
                f.write('[' + repo + ']' + '\n')
                f.write('location = ' + repo_path + '\n')
        except PermissionError:
            rmtree(repo_path)
            return Result.Err('you do not have enough permissions to setup a portage repo')
        Repo.init(repo_path)
        try:
            return Result.Ok(init_new(repo_path, repo).unwrap())
        except ResultException as e:
            rmtree(repo_path)
            return Result.Err(str(e))
    else:
        if repo not in rsets.prepos_order:
            return Result.Err('repository not found')
        return init_pomu(rsets.prepos[repo], repo)
项目:pentagon    作者:reactiveops    | 项目源码 | 文件源码
def __git_init(self):
        """ Initialize git repository in the project infrastructure path """
        if self._git_repo:
            return Git().clone(self._git_repo, self._repository_directory)
        else:
            return Repo.init(self._repository_directory)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ambiguous_arg_iteration(self, rw_dir):
        rw_repo = Repo.init(osp.join(rw_dir, 'test_ambiguous_arg'))
        path = osp.join(rw_repo.working_tree_dir, 'master')
        touch(path)
        rw_repo.index.add([path])
        rw_repo.index.commit('initial commit')
        list(rw_repo.iter_commits(rw_repo.head.ref))  # should fail unless bug is fixed
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_add_file_and_commit(self, rw_dir):
        import git

        repo_dir = osp.join(rw_dir, 'my-new-repo')
        file_name = osp.join(repo_dir, 'new-file')

        r = git.Repo.init(repo_dir)
        # This function just creates an empty file ...
        open(file_name, 'wb').close()
        r.index.add([file_name])
        r.index.commit("initial commit")

        # ![test_add_file_and_commit]
项目:git-repo    作者:guyzmo    | 项目源码 | 文件源码
def init(self):  # pragma: no cover
        if 'GIT_WORK_TREE' in os.environ.keys() or 'GIT_DIR' in os.environ.keys():
            del os.environ['GIT_WORK_TREE']
项目:git-repo    作者:guyzmo    | 项目源码 | 文件源码
def do_gist_clone(self):
        service = self.get_service(lookup_repository=False)
        repo_path = os.path.join(self.path, self.gist_ref.split('/')[-1])
        service.repository = Repo.init(repo_path)
        service.gist_clone(self.gist_ref)
        log.info('Successfully cloned `{}` into `{}`!'.format( self.gist_ref, repo_path))
        return 0
项目:git-repo    作者:guyzmo    | 项目源码 | 文件源码
def setup_git_popen(self):
        # repository mockup (in a temporary place)
        self.repository = Repo.init(self.tempdir.name)
        # setup git command mockup
        self.Popen = MockPopen()
        def FixPopen(*a, **k):
            if 'start_new_session' in k:
                del k['start_new_session']
            return self.Popen.Popen(*a, **k)
        self.Popen.mock.Popen.side_effect = FixPopen
        self.Popen.mock.Popen_instance.stdin = None
        self.Popen.mock.Popen_instance.wait = lambda *a, **k: self.Popen.wait()
        self.Popen.mock.Popen_instance.__enter__ = lambda self: self
        self.Popen.mock.Popen_instance.__exit__ = lambda self, *a, **k: None
项目:git-repo    作者:guyzmo    | 项目源码 | 文件源码
def main_add(self, repo, rc=0, args={}):
        if repo:
            create_repo = repo.split('/')[-1]
        else:
            create_repo = 'fubar'
        os.mkdir(os.path.join(self.tempdir.name, create_repo))
        Repo.init(os.path.join(self.tempdir.name, create_repo))
        assert rc == main(self.setup_args({
            'add': True,
            '<user>/<repo>': repo,
            '--path': self.tempdir.name
        }, args)), "Non {} result for add".format(rc)
        return RepositoryService._current._did_add
项目:git-repo    作者:guyzmo    | 项目源码 | 文件源码
def main_delete(self, repo=None, rc=0, args={}):
        if repo:
            repo_path = os.path.join(self.tempdir.name, repo.split('/')[-1])
            os.mkdir(repo_path)
            Repo.init(repo_path)
        assert rc == main(self.setup_args({
            'delete': True,
            '<user>/<repo>': repo,
            '--path': self.tempdir.name,
        }, args)), "Non {} result for delete".format(rc)
        return RepositoryService._current._did_delete
项目:git-repo    作者:guyzmo    | 项目源码 | 文件源码
def action_clone(self, namespace, repository):
        # hijack subprocess call
        with self.mockup_git(namespace, repository):
            local_slug = self.service.format_path(namespace=namespace, repository=repository, rw=True)
            self.set_mock_popen_commands([
                ('git init', b'Initialized empty Git repository in /tmp/bar/.git/', b'', 0),
                ('git remote add all {}'.format(local_slug), b'', b'', 0),
                ('git remote add {} {}'.format(self.service.name, local_slug), b'', b'', 0),
                ('git version', b'git version 2.8.0', b'', 0),
                ('git pull --progress -v {} master'.format(self.service.name), b'', '\n'.join([
                    'POST git-upload-pack (140 bytes)',
                    'remote: Counting objects: 8318, done.',
                    'remote: Compressing objects: 100% (3/3), done.',
                    'remote: Total 8318 (delta 0), reused 0 (delta 0), pack-reused 8315',
                    'Receiving objects: 100% (8318/8318), 3.59 MiB | 974.00 KiB/s, done.',
                    'Resolving deltas: 100% (5126/5126), done.',
                    'From {}:{}/{}'.format(self.service.fqdn, namespace, repository),
                    ' * branch            master     -> FETCH_HEAD',
                    ' * [new branch]      master     -> {}/master'.format(self.service.name)]).encode('utf-8'),
                0)
            ])
            with self.recorder.use_cassette(self._make_cassette_name()):
                self.service.connect()
                self.service.clone(namespace, repository)
                self.service.repository.create_remote('all', url=local_slug)
                self.service.repository.create_remote(self.service.name, url=local_slug)
项目:git-repo    作者:guyzmo    | 项目源码 | 文件源码
def action_add(self, namespace, repository, alone=False, name=None,
            tracking='master', auto_slug=False, remotes={}):
        with self.recorder.use_cassette(self._make_cassette_name()):
            # init git in the repository's destination
            self.repository.init()
            for remote, url in remotes.items():
                self.repository.create_remote(remote, url)
            self.service.connect()
            self.service.add(user=namespace, repo=repository, alone=alone, name=name, tracking=tracking, auto_slug=auto_slug)
            #
            if not tracking:
                if not alone and not name:
                    self.assert_added_remote_defaults()
                elif not alone and name:
                    self.assert_added_remote(name)
                    self.assert_added_remote('all')
                elif alone and not name:
                    self.assert_added_remote(self.service.name)
                elif alone and name:
                    self.assert_added_remote(name)
            else:
                if not alone and not name:
                    self.assert_added_remote_defaults()
                    self.assert_tracking_remote()
                elif not alone and name:
                    self.assert_added_remote(name)
                    self.assert_added_remote('all')
                    self.assert_tracking_remote(name)
                elif alone and not name:
                    self.assert_added_remote(self.service.name)
                    self.assert_tracking_remote(branch_name=tracking)
                elif alone and name:
                    self.assert_added_remote(name)
                    self.assert_tracking_remote(name, tracking)
项目:urban-journey    作者:urbanjourney    | 项目源码 | 文件源码
def init(path=None):
        """Creates an empty uj project. If possible and not existing, it initializes a git repository."""
        # Initialize current directory if no arguments where given.
        target_directory = path or "./"

        # Walk through empty source directory and copy any non existing files.
        for (dir_path, dir_names, file_names) in os.walk(empty_project_dir):
            # Get relative path to source root.
            rel_path = os.path.relpath(dir_path, empty_project_dir)
            # Get path to current target directory
            target_path = os.path.normpath(os.path.join(target_directory, rel_path))
            # Create target directory if necessary.
            if not os.path.isdir(target_path):
                os.mkdir(target_path)
            # Create file id necessary.
            for file_name in file_names:
                if not os.path.exists(os.path.join(target_path, file_name)):
                    copyfile(os.path.join(dir_path, file_name), os.path.join(target_path, file_name))
                # If it's copying a ujml file. Fill is the version number.
                if file_name.endswith(".ujml"):
                    with open(os.path.join(target_path, file_name), "r") as f:
                        content = f.read()
                    with open(os.path.join(target_path, file_name), "w") as f:
                        f.write(content.format(version=uj_version))

        # If possible make sure it's a git repository.
        if git_available:
            try:
                Repo(target_directory)
            except InvalidGitRepositoryError:
                Repo.init(target_directory)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def init_package_in_develop(self):
        # delete .git if exist
        os.system("rm -rf " + osp.join(self.__package_path, '.git'))
        os.system("rm -rf " + osp.join(self.__package_path, 'deploy'))

        # init git for make a local repository
        repo = Repo.init(str(self.__package_path))
        repo.git.add('*.py')
        repo.git.add('*.json')
        repo.index.commit('add new files')
项目:django-gitversions    作者:michaelkuty    | 项目源码 | 文件源码
def init(self, path=None):
        '''Inicialize repo object or init/clone new Git repo'''

        if os.path.isdir(self.path):
            self._repo = Repo.init(path or self.path)
        else:
            if self._url:
                # clone remote is there url
                mkdir_p(self.path)
                self._repo = Repo.clone_from(self._url,
                                             self.path,
                                             branch='master')

        return self._repo
项目:django-gitversions    作者:michaelkuty    | 项目源码 | 文件源码
def repo(self):
        '''Try create instance of repo object or init new Git one'''
        if not hasattr(self, '_repo'):
            try:
                self._repo = Repo(self.path)
            except:
                self.init()
        return self._repo
项目:dupin    作者:guardian    | 项目源码 | 文件源码
def _initialise_history_repo(repo_path):
    repo = Repo.init(repo_path)
    with open(os.path.join(repo_path, "README.md"), "w") as readme:
        print(_README(), file=readme)
    repo.index.add(["README.md"])
    repo.index.commit("Initial commit")
    return
项目:maintain    作者:kylef    | 项目源码 | 文件源码
def __enter__(self):
        path = super(git_bare_repo, self).__enter__()
        Repo.init(bare=True)
        return path
项目:maintain    作者:kylef    | 项目源码 | 文件源码
def __enter__(self):
        super(git_repo, self).__enter__()
        return Repo.init()
项目:maintain    作者:kylef    | 项目源码 | 文件源码
def test_git_release_with_remote(self):
        with git_bare_repo() as bare_repo:
            with temp_directory():
                with open('VERSION', 'w') as fp:
                    fp.write('1.0.0\n')

                repo = Repo.init()
                repo.index.add(['VERSION'])
                repo.index.commit('Initial commit')
                repo.create_remote('origin', url=bare_repo)
                repo.remotes.origin.push(repo.refs.master)

                result = self.runner.invoke(release, ['2.0.0'])
                self.assertIsNone(result.exception)
                self.assertEqual(result.exit_code, 0)

                with open('VERSION') as fp:
                    self.assertEqual(fp.read(), '2.0.0\n')

                self.assertEqual(repo.refs.master.commit.message, 'Release 2.0.0')
                self.assertEqual(repo.tags['2.0.0'].commit, repo.refs.master.commit)
                self.assertFalse(repo.is_dirty())

            bare_repo = Repo(bare_repo)
            self.assertEqual(bare_repo.commit('master').message, 'Release 2.0.0')
            self.assertEqual(bare_repo.tags['2.0.0'].commit, bare_repo.refs.master.commit)
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_get_commits(self, tmpdir):
        """Verify that we can get commits for a repo."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        for x in range(0, 10):
            file = p / "test{0}.txt".format(x)
            file.write_text(u"Test", encoding='utf-8')
            repo.index.add(['test{0}.txt'.format(x)])
            repo.index.commit("Commit #{0}".format(x))

        commits = osa_differ.get_commits(path, 'HEAD~2', 'HEAD')
        assert len(list(commits)) == 2
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_get_commits_hide_merges(self, tmpdir):
        """Verify that we can get commits for a repo."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        for x in range(0, 10):
            file = p / "test{0}.txt".format(x)
            file.write_text(u"Test", encoding='utf-8')
            repo.index.add(['test{0}.txt'.format(x)])
            repo.index.commit("Merge #{0}".format(x))

        commits = osa_differ.get_commits(path, 'HEAD~2', 'HEAD',
                                         hide_merges=True)
        assert len(list(commits)) == 0
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_get_commits_include_merges(self, tmpdir):
        """Verify that we can get commits for a repo."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        for x in range(0, 10):
            file = p / "test{0}.txt".format(x)
            file.write_text(u"Test", encoding='utf-8')
            repo.index.add(['test{0}.txt'.format(x)])
            repo.index.commit("Merge #{0}".format(x))

        commits = osa_differ.get_commits(path, 'HEAD~2', 'HEAD',
                                         hide_merges=False)
        assert len(list(commits)) == 2
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_commit_invalid(self, tmpdir):
        """Verify that we can find valid commits."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing')

        with raises(Exception):
            osa_differ.validate_commits(path, ['HEAD~1'])
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_commit_range_not_valid(self, tmpdir):
        """Verify that we can test a commit range for validity."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing1', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 1')
        file.write_text(u'Testing2', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 2')

        with raises(Exception):
            osa_differ.validate_commit_range(path, 'HEAD~2', 'HEAD')
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_render_template(self, tmpdir):
        """Verify that we can render a jinja template."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        for x in range(0, 10):
            file = p / "test{0}.txt".format(x)
            file.write_text(u"Test", encoding='utf-8')
            repo.index.add(['test{0}.txt'.format(x)])
            repo.index.commit("Commit #{0}".format(x))

        commits = osa_differ.get_commits(path, 'HEAD~2', 'HEAD')

        template_vars = {
            'repo': 'openstack-ansible',
            'commits': commits,
            'commit_base_url': 'http://example.com',
            'old_sha': 'HEAD~10',
            'new_sha': 'HEAD~1'
        }
        template_filename = "offline-repo-changes.j2"
        rst = osa_differ.render_template(template_filename,
                                         template_vars)
        assert "openstack-ansible" in rst
        assert "2 commits were found" in rst
        assert "http://example.com" in rst
        assert "HEAD~10" in rst
        assert "HEAD~1" in rst
项目:osa_differ    作者:major    | 项目源码 | 文件源码
def test_repo_update_update(self, tmpdir):
        """Verify that update_repo tries to update the repo."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing')

        result = osa_differ.update_repo(path, path)

        assert result.active_branch.name == 'master'
        assert not result.is_dirty()
项目:col-aws-clients    作者:collectrium    | 项目源码 | 文件源码
def test_lambda_deployment(self):
        aws_lambda_config={
                'roma_api_function': {
                    'role_name': 'lambda_basic_execution',
                    'handler': 'lambda_module.lambda_handler',
                    'event_sources': {
                        'api_gateway': {},
                    },
                    'ignored_packages': ['ipython', 'pudb']
                }
        }
        git_repo = Repo.init(path=tempfile.mkdtemp())
        zip_file = LambdaPackage(
            aws_lambda_config, git_repo.git_dir
        ).create_deployment_package()
        lambda_deployer = LambdaDeployer(
            region_name=self.region_name,
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
            zip_file=zip_file,
            version='development',
            aws_lambda_config=aws_lambda_config
        )

        lambda_deployer.deploy()
        lambda_deployer.deploy()
项目:coon    作者:comtihon    | 项目源码 | 文件源码
def set_git_url(path: str, url: str):
    repo = Repo.init(path)
    repo.index.add(listdir(path))
    repo.index.commit("First commit")
    repo.create_head('master')
    repo.create_remote('origin', url=url + '.git')