我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用git.Repo.init()。
def do_clone(self, service=None, repo_path=None): service = service or self.get_service(lookup_repository=False) repo_path = repo_path or os.path.join(self.path, self.target_repo or self.repo_name) if os.path.exists(repo_path) and os.listdir(repo_path) != []: raise FileExistsError('Cannot clone repository, ' 'a folder named {} already exists and ' 'is not an empty directory!'.format(repo_path)) try: repository = Repo.init(repo_path) service = RepositoryService.get_service(repository, self.target) service.clone(self.namespace, self.repo_name, self.branch) log.info('Successfully cloned `{}` into `{}`!'.format( service.format_path(self.repo_slug), repo_path) ) return 0 except Exception as err: if os.path.exists(repo_path): shutil.rmtree(repo_path) raise ResourceNotFoundError(err.args[2].decode('utf-8')) from err
def test_git_release_without_remote(self): with self.runner.isolated_filesystem(): with open('VERSION', 'w') as fp: fp.write('1.0.0\n') repo = Repo.init() repo.index.add(['VERSION']) repo.index.commit('Initial commit') result = self.runner.invoke(release, ['2.0.0']) self.assertIsNone(result.exception) self.assertEqual(result.exit_code, 0) with open('VERSION') as fp: self.assertEqual(fp.read(), '2.0.0\n') self.assertEqual(repo.refs.master.commit.message, 'Release 2.0.0') self.assertEqual(repo.tags['2.0.0'].commit, repo.refs.master.commit) self.assertFalse(repo.is_dirty())
def test_get_projects(self, tmpdir): """Verify that we can retrieve projects.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test1.yml' file.write_text(u"""--- tempest_git_repo: https://git.openstack.org/openstack/tempest tempest_git_install_branch: 1493c7f0ba49bfccb9ff8516b10a65d949d7462e tempest_git_project_group: utility_all """, encoding='utf-8') file = p / 'test2.yml' file.write_text(u"""--- novncproxy_git_repo: https://github.com/kanaka/novnc novncproxy_git_install_branch: da82b3426c27bf1a79f671c5825d68ab8c0c5d9f novncproxy_git_project_group: nova_console """, encoding='utf-8') repo.index.add(['test1.yml', 'test2.yml']) repo.index.commit("Test") projects = osa_differ.get_projects(path, 'HEAD') assert isinstance(projects, list)
def test_get_roles(self, tmpdir): """Verify that we can get OSA role information.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'ansible-role-requirements.yml' file.write_text(u""" - name: apt_package_pinning scm: git src: https://github.com/openstack/openstack-ansible-apt_package_pinning version: master """, encoding='utf-8') repo.index.add(['ansible-role-requirements.yml']) repo.index.commit("Test") roles = osa_differ.get_roles(path, 'HEAD', 'ansible-role-requirements.yml') assert isinstance(roles, list) assert roles[0][0] == 'apt_package_pinning'
def test_commit_range_valid(self, tmpdir): """Verify that we can test a commit range for validity.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing1', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 1') file.write_text(u'Testing2', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 2') result = osa_differ.validate_commit_range(path, 'HEAD~1', 'HEAD') assert result
def test_commit_range_flipped(self, tmpdir): """Verify that we can test a commit range for validity.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing1', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 1') file.write_text(u'Testing2', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 2') result = osa_differ.validate_commit_range(path, 'HEAD', 'HEAD~1') assert result == 'flip'
def test_make_osa_report(self, tmpdir): """Verify that we can make the OSA header report.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing1', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 1') file.write_text(u'Testing2', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 2') parser = osa_differ.create_parser() args = parser.parse_args(['HEAD~1', 'HEAD']) report = osa_differ.make_osa_report(path, 'HEAD~1', "HEAD", args) assert "HEAD~1" in report assert "OpenStack-Ansible Diff Generator" in report
def test_make_report_old_pin_missing(self, tmpdir): """Verify that we can make a report when the old pin is missing.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing1', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 1') file.write_text(u'Testing2', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 2') new_pins = [("test", "http://example.com", "HEAD")] old_pins = [] report = osa_differ.make_report(str(tmpdir), old_pins, new_pins) assert report == ''
def test_repo_clone(self, tmpdir, monkeypatch): """Verify that we can clone a repo.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing') def mockclone(x, y): return path def mockwait(*args, **kwargs): return True monkeypatch.setattr("git.repo.base.Repo.clone", mockclone) monkeypatch.setattr("git.cmd.Git.AutoInterrupt.wait", mockwait) result = osa_differ.repo_clone(path, "http://example.com") assert result.active_branch.name == 'master' assert not result.is_dirty()
def test_repo_clone_update(self, tmpdir): """Verify that we can clone a repo.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing') p = tmpdir.mkdir("test2") path_clonefrom = "{0}/testrepodoesntexist".format(str(p)) result = osa_differ.update_repo(path_clonefrom, path) assert result.active_branch.name == 'master' assert not result.is_dirty()
def test_repo_update_with_fetch(self, tmpdir, monkeypatch): """Verify that we can get a repo ready and update it.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing') repo.create_remote('origin', url='http://example.com') monkeypatch.setattr( "git.cmd.Git._call_process", lambda *args, **kwargs: True) result = osa_differ.repo_pull(path, "http://example.com", fetch=True) monkeypatch.undo() assert result.active_branch.name == 'master' assert not result.is_dirty()
def init_plain_repo(create, repo_path): """ Initialize a plain repository Parameters: create - if true, create a new git repo, else, reuse an existing one repo_path - a path for the repository to reside in """ if not repo_path: return Result.Err('repository path required') if create: if path.isdir(repo_path): return Result.Err('this repository already exists') try: makedirs(repo_path) except PermissionError: return Result.Err('you do not have enough permissions to create the git repository') Repo.init(repo_path) try: return Result.Ok(init_new(repo_path).unwrap()) except ResultException as e: rmtree(repo_path) return Result.Err(str(e)) else: if not path.isdir(repo_path): return Result.Err('directory not found') return init_pomu(repo_path)
def init_portage_repo(create, repo, repo_dir): """ Initialize a portage repository Parameters: create - if true, create a new portage repo with git, else, reuse an existing one repo - name of the repository repo_dir - location of the newly created repository, if applicable """ if not repo: return Result.Err('repository name required') rsets = portage.db[portage.root]['vartree'].settings.repositories if create: if repo in rsets.prepos_order: return Result.Err('a repository with such name already exists!') repo_path = path.join(repo_dir, repo) try: makedirs(repo_path) except PermissionError: return Result.Err('you do not have enough permissions to create the git repository') try: with open(path.join(portage.root, 'etc/portage/repos.conf', 'pomu.conf'), 'a') as f: f.write('[' + repo + ']' + '\n') f.write('location = ' + repo_path + '\n') except PermissionError: rmtree(repo_path) return Result.Err('you do not have enough permissions to setup a portage repo') Repo.init(repo_path) try: return Result.Ok(init_new(repo_path, repo).unwrap()) except ResultException as e: rmtree(repo_path) return Result.Err(str(e)) else: if repo not in rsets.prepos_order: return Result.Err('repository not found') return init_pomu(rsets.prepos[repo], repo)
def __git_init(self): """ Initialize git repository in the project infrastructure path """ if self._git_repo: return Git().clone(self._git_repo, self._repository_directory) else: return Repo.init(self._repository_directory)
def test_ambiguous_arg_iteration(self, rw_dir): rw_repo = Repo.init(osp.join(rw_dir, 'test_ambiguous_arg')) path = osp.join(rw_repo.working_tree_dir, 'master') touch(path) rw_repo.index.add([path]) rw_repo.index.commit('initial commit') list(rw_repo.iter_commits(rw_repo.head.ref)) # should fail unless bug is fixed
def test_add_file_and_commit(self, rw_dir): import git repo_dir = osp.join(rw_dir, 'my-new-repo') file_name = osp.join(repo_dir, 'new-file') r = git.Repo.init(repo_dir) # This function just creates an empty file ... open(file_name, 'wb').close() r.index.add([file_name]) r.index.commit("initial commit") # ![test_add_file_and_commit]
def init(self): # pragma: no cover if 'GIT_WORK_TREE' in os.environ.keys() or 'GIT_DIR' in os.environ.keys(): del os.environ['GIT_WORK_TREE']
def do_gist_clone(self): service = self.get_service(lookup_repository=False) repo_path = os.path.join(self.path, self.gist_ref.split('/')[-1]) service.repository = Repo.init(repo_path) service.gist_clone(self.gist_ref) log.info('Successfully cloned `{}` into `{}`!'.format( self.gist_ref, repo_path)) return 0
def setup_git_popen(self): # repository mockup (in a temporary place) self.repository = Repo.init(self.tempdir.name) # setup git command mockup self.Popen = MockPopen() def FixPopen(*a, **k): if 'start_new_session' in k: del k['start_new_session'] return self.Popen.Popen(*a, **k) self.Popen.mock.Popen.side_effect = FixPopen self.Popen.mock.Popen_instance.stdin = None self.Popen.mock.Popen_instance.wait = lambda *a, **k: self.Popen.wait() self.Popen.mock.Popen_instance.__enter__ = lambda self: self self.Popen.mock.Popen_instance.__exit__ = lambda self, *a, **k: None
def main_add(self, repo, rc=0, args={}): if repo: create_repo = repo.split('/')[-1] else: create_repo = 'fubar' os.mkdir(os.path.join(self.tempdir.name, create_repo)) Repo.init(os.path.join(self.tempdir.name, create_repo)) assert rc == main(self.setup_args({ 'add': True, '<user>/<repo>': repo, '--path': self.tempdir.name }, args)), "Non {} result for add".format(rc) return RepositoryService._current._did_add
def main_delete(self, repo=None, rc=0, args={}): if repo: repo_path = os.path.join(self.tempdir.name, repo.split('/')[-1]) os.mkdir(repo_path) Repo.init(repo_path) assert rc == main(self.setup_args({ 'delete': True, '<user>/<repo>': repo, '--path': self.tempdir.name, }, args)), "Non {} result for delete".format(rc) return RepositoryService._current._did_delete
def action_clone(self, namespace, repository): # hijack subprocess call with self.mockup_git(namespace, repository): local_slug = self.service.format_path(namespace=namespace, repository=repository, rw=True) self.set_mock_popen_commands([ ('git init', b'Initialized empty Git repository in /tmp/bar/.git/', b'', 0), ('git remote add all {}'.format(local_slug), b'', b'', 0), ('git remote add {} {}'.format(self.service.name, local_slug), b'', b'', 0), ('git version', b'git version 2.8.0', b'', 0), ('git pull --progress -v {} master'.format(self.service.name), b'', '\n'.join([ 'POST git-upload-pack (140 bytes)', 'remote: Counting objects: 8318, done.', 'remote: Compressing objects: 100% (3/3), done.', 'remote: Total 8318 (delta 0), reused 0 (delta 0), pack-reused 8315', 'Receiving objects: 100% (8318/8318), 3.59 MiB | 974.00 KiB/s, done.', 'Resolving deltas: 100% (5126/5126), done.', 'From {}:{}/{}'.format(self.service.fqdn, namespace, repository), ' * branch master -> FETCH_HEAD', ' * [new branch] master -> {}/master'.format(self.service.name)]).encode('utf-8'), 0) ]) with self.recorder.use_cassette(self._make_cassette_name()): self.service.connect() self.service.clone(namespace, repository) self.service.repository.create_remote('all', url=local_slug) self.service.repository.create_remote(self.service.name, url=local_slug)
def action_add(self, namespace, repository, alone=False, name=None, tracking='master', auto_slug=False, remotes={}): with self.recorder.use_cassette(self._make_cassette_name()): # init git in the repository's destination self.repository.init() for remote, url in remotes.items(): self.repository.create_remote(remote, url) self.service.connect() self.service.add(user=namespace, repo=repository, alone=alone, name=name, tracking=tracking, auto_slug=auto_slug) # if not tracking: if not alone and not name: self.assert_added_remote_defaults() elif not alone and name: self.assert_added_remote(name) self.assert_added_remote('all') elif alone and not name: self.assert_added_remote(self.service.name) elif alone and name: self.assert_added_remote(name) else: if not alone and not name: self.assert_added_remote_defaults() self.assert_tracking_remote() elif not alone and name: self.assert_added_remote(name) self.assert_added_remote('all') self.assert_tracking_remote(name) elif alone and not name: self.assert_added_remote(self.service.name) self.assert_tracking_remote(branch_name=tracking) elif alone and name: self.assert_added_remote(name) self.assert_tracking_remote(name, tracking)
def init(path=None): """Creates an empty uj project. If possible and not existing, it initializes a git repository.""" # Initialize current directory if no arguments where given. target_directory = path or "./" # Walk through empty source directory and copy any non existing files. for (dir_path, dir_names, file_names) in os.walk(empty_project_dir): # Get relative path to source root. rel_path = os.path.relpath(dir_path, empty_project_dir) # Get path to current target directory target_path = os.path.normpath(os.path.join(target_directory, rel_path)) # Create target directory if necessary. if not os.path.isdir(target_path): os.mkdir(target_path) # Create file id necessary. for file_name in file_names: if not os.path.exists(os.path.join(target_path, file_name)): copyfile(os.path.join(dir_path, file_name), os.path.join(target_path, file_name)) # If it's copying a ujml file. Fill is the version number. if file_name.endswith(".ujml"): with open(os.path.join(target_path, file_name), "r") as f: content = f.read() with open(os.path.join(target_path, file_name), "w") as f: f.write(content.format(version=uj_version)) # If possible make sure it's a git repository. if git_available: try: Repo(target_directory) except InvalidGitRepositoryError: Repo.init(target_directory)
def init_package_in_develop(self): # delete .git if exist os.system("rm -rf " + osp.join(self.__package_path, '.git')) os.system("rm -rf " + osp.join(self.__package_path, 'deploy')) # init git for make a local repository repo = Repo.init(str(self.__package_path)) repo.git.add('*.py') repo.git.add('*.json') repo.index.commit('add new files')
def init(self, path=None): '''Inicialize repo object or init/clone new Git repo''' if os.path.isdir(self.path): self._repo = Repo.init(path or self.path) else: if self._url: # clone remote is there url mkdir_p(self.path) self._repo = Repo.clone_from(self._url, self.path, branch='master') return self._repo
def repo(self): '''Try create instance of repo object or init new Git one''' if not hasattr(self, '_repo'): try: self._repo = Repo(self.path) except: self.init() return self._repo
def _initialise_history_repo(repo_path): repo = Repo.init(repo_path) with open(os.path.join(repo_path, "README.md"), "w") as readme: print(_README(), file=readme) repo.index.add(["README.md"]) repo.index.commit("Initial commit") return
def __enter__(self): path = super(git_bare_repo, self).__enter__() Repo.init(bare=True) return path
def __enter__(self): super(git_repo, self).__enter__() return Repo.init()
def test_git_release_with_remote(self): with git_bare_repo() as bare_repo: with temp_directory(): with open('VERSION', 'w') as fp: fp.write('1.0.0\n') repo = Repo.init() repo.index.add(['VERSION']) repo.index.commit('Initial commit') repo.create_remote('origin', url=bare_repo) repo.remotes.origin.push(repo.refs.master) result = self.runner.invoke(release, ['2.0.0']) self.assertIsNone(result.exception) self.assertEqual(result.exit_code, 0) with open('VERSION') as fp: self.assertEqual(fp.read(), '2.0.0\n') self.assertEqual(repo.refs.master.commit.message, 'Release 2.0.0') self.assertEqual(repo.tags['2.0.0'].commit, repo.refs.master.commit) self.assertFalse(repo.is_dirty()) bare_repo = Repo(bare_repo) self.assertEqual(bare_repo.commit('master').message, 'Release 2.0.0') self.assertEqual(bare_repo.tags['2.0.0'].commit, bare_repo.refs.master.commit)
def test_get_commits(self, tmpdir): """Verify that we can get commits for a repo.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) for x in range(0, 10): file = p / "test{0}.txt".format(x) file.write_text(u"Test", encoding='utf-8') repo.index.add(['test{0}.txt'.format(x)]) repo.index.commit("Commit #{0}".format(x)) commits = osa_differ.get_commits(path, 'HEAD~2', 'HEAD') assert len(list(commits)) == 2
def test_get_commits_hide_merges(self, tmpdir): """Verify that we can get commits for a repo.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) for x in range(0, 10): file = p / "test{0}.txt".format(x) file.write_text(u"Test", encoding='utf-8') repo.index.add(['test{0}.txt'.format(x)]) repo.index.commit("Merge #{0}".format(x)) commits = osa_differ.get_commits(path, 'HEAD~2', 'HEAD', hide_merges=True) assert len(list(commits)) == 0
def test_get_commits_include_merges(self, tmpdir): """Verify that we can get commits for a repo.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) for x in range(0, 10): file = p / "test{0}.txt".format(x) file.write_text(u"Test", encoding='utf-8') repo.index.add(['test{0}.txt'.format(x)]) repo.index.commit("Merge #{0}".format(x)) commits = osa_differ.get_commits(path, 'HEAD~2', 'HEAD', hide_merges=False) assert len(list(commits)) == 2
def test_commit_invalid(self, tmpdir): """Verify that we can find valid commits.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing') with raises(Exception): osa_differ.validate_commits(path, ['HEAD~1'])
def test_commit_range_not_valid(self, tmpdir): """Verify that we can test a commit range for validity.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing1', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 1') file.write_text(u'Testing2', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing 2') with raises(Exception): osa_differ.validate_commit_range(path, 'HEAD~2', 'HEAD')
def test_render_template(self, tmpdir): """Verify that we can render a jinja template.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) for x in range(0, 10): file = p / "test{0}.txt".format(x) file.write_text(u"Test", encoding='utf-8') repo.index.add(['test{0}.txt'.format(x)]) repo.index.commit("Commit #{0}".format(x)) commits = osa_differ.get_commits(path, 'HEAD~2', 'HEAD') template_vars = { 'repo': 'openstack-ansible', 'commits': commits, 'commit_base_url': 'http://example.com', 'old_sha': 'HEAD~10', 'new_sha': 'HEAD~1' } template_filename = "offline-repo-changes.j2" rst = osa_differ.render_template(template_filename, template_vars) assert "openstack-ansible" in rst assert "2 commits were found" in rst assert "http://example.com" in rst assert "HEAD~10" in rst assert "HEAD~1" in rst
def test_repo_update_update(self, tmpdir): """Verify that update_repo tries to update the repo.""" p = tmpdir.mkdir('test') path = str(p) repo = Repo.init(path) file = p / 'test.txt' file.write_text(u'Testing', encoding='utf-8') repo.index.add(['test.txt']) repo.index.commit('Testing') result = osa_differ.update_repo(path, path) assert result.active_branch.name == 'master' assert not result.is_dirty()
def test_lambda_deployment(self): aws_lambda_config={ 'roma_api_function': { 'role_name': 'lambda_basic_execution', 'handler': 'lambda_module.lambda_handler', 'event_sources': { 'api_gateway': {}, }, 'ignored_packages': ['ipython', 'pudb'] } } git_repo = Repo.init(path=tempfile.mkdtemp()) zip_file = LambdaPackage( aws_lambda_config, git_repo.git_dir ).create_deployment_package() lambda_deployer = LambdaDeployer( region_name=self.region_name, aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, zip_file=zip_file, version='development', aws_lambda_config=aws_lambda_config ) lambda_deployer.deploy() lambda_deployer.deploy()
def set_git_url(path: str, url: str): repo = Repo.init(path) repo.index.add(listdir(path)) repo.index.commit("First commit") repo.create_head('master') repo.create_remote('origin', url=url + '.git')