我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用git.Repo()。
def git_push(): """ Git Push handler """ app = get_app() if not have_git: session.flash = GIT_MISSING redirect(URL('site')) form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY())) form.element('input[type=submit]')['_value'] = T('Push') form.add_button(T('Cancel'), URL('site')) form.process() if form.accepted: try: repo = git.Repo(os.path.join(apath(r=request), app)) index = repo.index index.add([apath(r=request) + app + '/*']) new_commit = index.commit(form.vars.changelog) origin = repo.remotes.origin origin.push() session.flash = T( "Git repo updated with latest application changes.") redirect(URL('site')) except git.UnmergedEntriesError: session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.") redirect(URL('site')) return dict(app=app, form=form)
def make_testing_branch(): repo = Repo('.') gitobj = repo.git # create 'test' branch if it doesn't exist so that it can be used for tests in this module git_branch_string = gitobj.branch() git_branch_list = git_branch_string.split("\n") clean_branch_list = [] for branch in git_branch_list: branch = branch.replace('*', '') branch = branch.replace(' ', '') clean_branch_list.append(branch) if 'testing_branch' in clean_branch_list: pass else: gitobj.branch('testing_branch') # deletes the temporary new git branch (testing_branch) for testing
def delete_testing_branch(): repo = Repo('.') gitobj = repo.git # create 'test' branch if it doesn't exist so that it can be used for tests in this module git_branch_string = gitobj.branch() git_branch_list = git_branch_string.split("\n") clean_branch_list = [] for branch in git_branch_list: branch = branch.replace('*', '') branch = branch.replace(' ', '') clean_branch_list.append(branch) if 'testing_branch' in clean_branch_list: gitobj.branch('-d', 'testing_branch') # /////////////////////////////////////////////////////// # # pytest capsys capture tests # confirms capture of std output and std error streams # # ///////////////////////////////////////////////////////
def delete_testing_branch(): repo = Repo('.') gitobj = repo.git # create 'test' branch if it doesn't exist so that it can be used for tests in this module git_branch_string = gitobj.branch() git_branch_list = git_branch_string.split("\n") clean_branch_list = [] for branch in git_branch_list: branch = branch.replace('*', '') branch = branch.replace(' ', '') clean_branch_list.append(branch) if 'testing_branch' in clean_branch_list: gitobj.branch('-d', 'testing_branch') # /////////////////////////////////////////////////////// # # Diff class tests # # ///////////////////////////////////////////////////////
def clone(self, name, user): """ create a clone of self with a given name, owned by user """ self.expiration = None # create the branch on the database new_branch = Branch(name, self.project, self, user) db.session.add(new_branch) db.session.commit() # clone repository in file system branch_path = os.path.abspath(join(os.getcwd(), 'repos', self.project.name, name, 'source')) self.get_repo().clone(branch_path, branch=self.name) os.symlink(os.path.abspath(join('repos', self.project.name, '_resources/low_resolution')), join(branch_path, '_resources')) branch_repo = git.Repo(branch_path) branch_repo.git.checkout('HEAD', b=name) config_repo(branch_repo, user.username, user.email) # build the source new_branch.build(timeout=60) return new_branch
def git_log(repo, prev_head): r = git.Repo(repo) if r.head.object.hexsha == prev_head: return [] git_commit_fields = ['id', 'author_name', 'author_email', 'date', 'message'] git_log_format = ['%H', '%an', '%ae', '%ad', '%s'] git_log_format = '%x1f'.join(git_log_format) + '%x1e' p = Popen('cd %s && git log %s..HEAD --format="%s"' % (repo, prev_head, git_log_format), shell=True, stdout=PIPE) (log, _) = p.communicate() log = log.strip('\n\x1e').split("\x1e") log = [row.strip().split("\x1f") for row in log] log = [dict(zip(git_commit_fields, row)) for row in log] return log
def process(self): """Initialises Mimiron using the configuration found in `config_path`.""" for i, repo in enumerate(self.data['terraformRepositories']): repo['path'] = os.path.expanduser(repo['path']) try: git_repo = Repo(repo['path']) if git_repo.bare: raise _InvalidGitRepositoryError repo['defaultEnvironment'] = repo.get('defaultEnvironment', None) repo['tagEnvironment'] = repo.get('tagEnvironment', None) repo['git'] = git_repo repo['tfvars'] = self._read_tfvars(repo) except _InvalidGitRepositoryError: raise InvalidGitRepository(repo['path']) except _NoSuchPathError: raise DeploymentRepositoriesNotSpecified
def render(self, context): try: r = Repo(BASE_DIR) version = r.rev_parse("HEAD").hexsha context['git'] = { "shortHexSha": version[0:7], "hexSha": version, "shortRemote": "https://github.com", "remote": "https://github.com/rubienr/network-monitoring" } except Exception: context['git'] = { "shortHexSha": None, "hexSha": None, "shortRemote": None, "remote": None, } return ""
def download_beard(beard_name, upgrade): beard_details = find_record(beard_name) git_ = git.Git("beard_cache") print("Attempting to clone from {}...".format(beard_details['git_url'])) try: repo_dir = join("beard_cache", beard_name) os.makedirs(repo_dir) repo = git.Repo() repo.clone_from(beard_details['git_url'], repo_dir) print("Done!") except FileExistsError: repo = git.Repo("beard_cache/{}".format(beard_name)) if upgrade: print("Updating repo") # Should be origin, since no-one should add another remote. repo.remotes.origin.pull() print("Done!") else: print("Repo already exists. Nothing to do.")
def test_rename(self, rwdir): parent = git.Repo.init(osp.join(rwdir, 'parent')) sm_name = 'mymodules/myname' sm = parent.create_submodule(sm_name, sm_name, url=self._small_repo_url()) parent.index.commit("Added submodule") assert sm.rename(sm_name) is sm and sm.name == sm_name assert not sm.repo.is_dirty(index=True, working_tree=False, untracked_files=False) new_path = 'renamed/myname' assert sm.move(new_path).name == new_path new_sm_name = "shortname" assert sm.rename(new_sm_name) is sm assert sm.repo.is_dirty(index=True, working_tree=False, untracked_files=False) assert sm.exists() sm_mod = sm.module() if osp.isfile(osp.join(sm_mod.working_tree_dir, '.git')) == sm._need_gitfile_submodules(parent.git): assert sm_mod.git_dir.endswith(join_path_native('.git', 'modules', new_sm_name)) # end
def _clone_repo(cls, repo, url, path, name, **kwargs): """:return: Repo instance of newly cloned repository :param repo: our parent repository :param url: url to clone from :param path: repository-relative path to the submodule checkout location :param name: canonical of the submodule :param kwrags: additinoal arguments given to git.clone""" module_abspath = cls._module_abspath(repo, path, name) module_checkout_path = module_abspath if cls._need_gitfile_submodules(repo.git): kwargs['separate_git_dir'] = module_abspath module_abspath_dir = osp.dirname(module_abspath) if not osp.isdir(module_abspath_dir): os.makedirs(module_abspath_dir) module_checkout_path = osp.join(repo.working_tree_dir, path) # end clone = git.Repo.clone_from(url, module_checkout_path, **kwargs) if cls._need_gitfile_submodules(repo.git): cls._write_git_file_and_module_config(module_checkout_path, module_abspath) # end return clone
def module(self): """:return: Repo instance initialized from the repository at our submodule path :raise InvalidGitRepositoryError: if a repository was not available. This could also mean that it was not yet initialized""" # late import to workaround circular dependencies module_checkout_abspath = self.abspath try: repo = git.Repo(module_checkout_abspath) if repo != self.repo: return repo # END handle repo uninitialized except (InvalidGitRepositoryError, NoSuchPathError): raise InvalidGitRepositoryError("No valid repository at %s" % module_checkout_abspath) else: raise InvalidGitRepositoryError("Repository at %r was not yet checked out" % module_checkout_abspath) # END handle exceptions
def _get_org_and_name_from_remote(remote): """ Gets the org name and the repo name for a github remote. :param git.Remote remote: :return: The owner and the repo name in that order :rtype: unicode, unicode """ url = remote.config_reader.get('url') if not url.endswith('.git'): url = '{0}.git'.format(url) git_info = parse(url) _LOG.debug('Repo owner: "{0}"'.format(git_info.owner)) _LOG.debug('Repo name: "{0}"'.format(git_info.repo)) return git_info.owner, git_info.repo
def _create_pull_request_helper(repo, remote_repo, branch, base, auth=None): """ Creates a pull request to merge the branch into the base. Ignores the error if there is already a pull request open for the given branch->base :param Repo repo: :param Repository remote_repo: :param unicode branch: :param unicode base: """ title, message = construct_message(repo, base, branch) try: _LOG.info('Creating pull request') return remote_repo.create_pull(title=title, head=branch, base=base, body='Autogenerated: \n\n{0}'.format(message)) except GithubException as exc: if 'errors' in exc.data and len(exc.data['errors']) == 1 and \ exc.data['errors'][0].get('message', '').startswith('A pull request already exists for'): _LOG.warning('A pull request already exists for "{0}". Continuing.'.format(branch)) return _get_pull_request(remote_repo, branch, base) else: raise exc
def _get_relevant_commits(repo, commit_shas, branch): """ Gets all commits on the repo with the given shas :param Repo repo: :param list[unicode] commit_shas: :param unicode branch: :return: list[Commit] """ remaining_shas = set(commit_shas) commits = list() for commit in repo.iter_commits(branch): if commit.hexsha in remaining_shas: commits.append(commit) remaining_shas.remove(commit.hexsha) return commits
def _get_remote(repo, name): """ Gets the remote object raising a MissingRemoteException when it does not exist :param Repo repo: :param unicode name: The remote name :return: The remote object :rtype: git.remote.Remote :raises: MissingRemoteException """ try: return repo.remotes[name] except IndexError: # I have no idea why they raise an IndexError instead of KeyError raise MissingRemoteException('The remote "{0}" does not exist. ' 'Please select a different remote or' ' add it using "git remote add" command')
def push_repo(repo, branch, remote='origin', remote_branch=None): """ Pushes the repo up to the remote. It will set the upstream to the remote branch. :param Repo repo: The repo you wish to push :param unicode branch: The branch being pushed :param unicode remote: The remote to use (e.g. ``'origin'`` :param unicode remote_branch: The remote branch. Defaults to the `branch` parameter :return: None :raises: PushFailedException """ remote_branch = remote_branch or branch _checkout_branch(repo, branch) _LOG.info("Pushing all commits to remote '{0}'".format(remote)) remote = _get_remote(repo, remote) try: remote.push(remote_branch, set_upstream=True) except GitCommandError as e: _LOG.error(str(e)) raise PushFailedException('Uh oh, it seems like something went' ' wrong with the push. "{0}"'.format(str(e)))
def check_multiple_branches(repository): """Check whether a git repository has more than one branch. Parameters ---------- repository : string Path to a git repository Results ------- boolean True if the repository has more than 1 branch, False otherwise Raises ------ git.InvalidGitRepositoryError if repository is a path to a directory but not a git repository git.NoSuchPathError if repository is not a path to a directory """ repo = git.Repo(repository) branches = repo.branches if len(branches) > 1: return True else: return False
def __init__(self, root, testMode=False): super(GitManager, self).__init__() self.root = root self.commitTable = {} # ??????????(submodule, last push binsha)??? self.ownerRepo = None try: self.problemHub = git.Repo(root) self.readLastCommitBinsha() except git.InvalidGitRepositoryError: if not testMode: self.problemHub = self.setup() else: pass except FileNotFoundError: self.commitTable = {} # self.acrot = git.Actor(author, authorEmaill) # def __str__(self):
def git_status(self): repo = Repo('./') o = repo.remotes.origin o.fetch() # Tags tags = [] for t in repo.tags: tags.append({"name": t.name, "commit": str(t.commit), "date": t.commit.committed_date, "committer": t.commit.committer.name, "message": t.commit.message}) try: branch_name = repo.active_branch.name # test1 except: branch_name = None changes = [] commits_behind = repo.iter_commits('master..origin/master') for c in list(commits_behind): changes.append({"committer": c.committer.name, "message": c.message}) return json.dumps({"tags": tags, "headcommit": str(repo.head.commit), "branchname": branch_name, "master": {"changes": changes}})
def pull_list_changed_files(git_path): """Pull new updates from remote origin.""" git_repo = git.Repo(git_path) logging.info(" HEAD: " + str(git_repo.head.commit)) logging.info(" is detached: " + str(git_repo.head.is_detached)) logging.info(" is dirty: " + str(git_repo.is_dirty())) if git_repo.head.is_detached: raise Exception("Detached head") if git_repo.is_dirty(): raise Exception("Dirty repository") files = [] cdnjs_origin = git_repo.remotes.origin fetch_info = cdnjs_origin.pull() for single_fetch_info in fetch_info: for diff in single_fetch_info.commit.diff( single_fetch_info.old_commit): logging.debug("Found diff: " + str(diff)) if not diff.a_blob is None: if not diff.a_blob.path in files: files.append(diff.a_blob.path) return files
def git_pull(): """ Git Pull handler """ app = get_app() if not have_git: session.flash = GIT_MISSING redirect(URL('site')) dialog = FORM.confirm(T('Pull'), {T('Cancel'): URL('site')}) if dialog.accepted: try: repo = git.Repo(os.path.join(apath(r=request), app)) origin = repo.remotes.origin origin.fetch() origin.pull() session.flash = T("Application updated via git pull") redirect(URL('site')) except git.CheckoutError: session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.") redirect(URL('site')) except git.UnmergedEntriesError: session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.") redirect(URL('site')) except git.GitCommandError: session.flash = T( "Pull failed, git exited abnormally. See logs for details.") redirect(URL('site')) except AssertionError: session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.") redirect(URL('site')) elif 'cancel' in request.vars: redirect(URL('site')) return dict(app=app, dialog=dialog)
def deps(repo: Optional[RepoSpec], format: str, file: Optional[str], on_uncloned: str) -> Iterable[str]: if file is not None: file = Path(file) if not file.exists(): raise ValueError(f"Dependency file does not exist: {file}") t = Template(format) for clone in iterDeps(repo, on_uncloned, file): try: hexsha = git.Repo(str(clone.path)).head.commit.hexsha except: hexsha = '0' * 40 try: yield t.substitute( H = hexsha, h = hexsha[:7], RS = clone.repospec.str(), rs = clone.repospec.str(False, False), p = str(clone.path), ) except KeyError as e: raise ValueError("Invalid format string specifier: %s" % e)
def git_helper(self): self.addHost('daemon', 'host', 'http://localhost', 'user', 'pw', force = True) r1 = git.Repo.init('repo1') Path('repo1/deps.got').write_text("repo2\nrepo3\n") r1.index.add(['deps.got']) r1.index.commit('Commit') with GotRun(['--here', 'host:repo1', 'repo1', '--force']): pass r2 = git.Repo.init('repo2') r2.index.commit('Commit') with GotRun(['--here', 'host:repo2', 'repo2', '--force']): pass r3 = git.Repo.init('repo3') r3.index.commit('Commit') with GotRun(['--here', 'host:repo3', 'repo3', '--force']): pass return r1, r2, r3
def get_remote_branch_info(): local_repo = Repo(path=settings.BASE_DIR) # Fetch remote branches to ensure we are up to date for remote in local_repo.remotes: remote.fetch() remote_repo = local_repo.remote() local_branch = local_repo.active_branch.name remote_branches = [] for this_branch in remote_repo.refs: remote_branches.append(this_branch.remote_head) return {'local_branch': local_branch, 'remote_branches': remote_branches}
def _initialize_repo(repo_url, repo_dir, branch_name, config, progress=None): """ Clones repository and configures it to use sparse checkout. Extraneous folders will get removed later using git read-tree """ util.logger.info('Repo {} doesn\'t exist. Cloning...'.format(repo_url)) # Clone repo repo = git.Repo.clone_from( repo_url, repo_dir, progress, branch=branch_name, ) # Use sparse checkout config = repo.config_writer() config.set_value('core', 'sparsecheckout', True) config.release() util.logger.info('Repo {} initialized'.format(repo_url))
def remote_git_add(config_as_dict, url_template, host): for repo_url, branch, app_name in repo_and_branch_and_app_name_iterator(config_as_dict): repo_dir_name = dir_name_for_repo(repo_url) repo_full_path = get_repo_full_path_for_repo_dir_name(repo_dir_name, config_as_dict) repo = git.Repo(repo_full_path) git_remote = url_template.format(host=host, app_name=app_name) gitremote_repo_name = get_remote_repo_name(config_as_dict) remote_names = [remote.name for remote in repo.remotes] remote_urls = [remote.url for remote in repo.remotes] if git_remote in remote_urls: print("Already in remote: %s" % git_remote) continue if gitremote_repo_name in remote_names: repo.delete_remote(gitremote_repo_name) print("Adding remote '%s' for %s" % (gitremote_repo_name, git_remote)) repo.create_remote(get_remote_repo_name(config_as_dict), git_remote)
def git_clone_all(config_as_dict): progress = GitProgress() for repo_url, branch, app_name, app_props in repo_and_branch_and_app_name_and_app_props_iterator(config_as_dict): repo_full_path = get_repo_full_path_for_repo_url(repo_url, config_as_dict) if os.path.exists(repo_full_path): print("Already cloned %s from %s" % (app_name, repo_url)) repo = git.Repo(repo_full_path) if repo.active_branch.name != branch: print("Your local checkout is in a different branch (%s) from the branch you want to deploy (%s). URL: %s" % (repo.active_branch.name, branch, repo_url)) repo.git.checkout(branch) origin = repo.remotes.origin #origin.fetch(branch) if config_as_dict["gitpull"]: origin.pull(branch) else: os.makedirs(repo_full_path) try: repo = git.Repo.clone_from(repo_url, repo_full_path, branch=branch, progress=progress) print("Cloned: %s" % repo) except git.GitCommandError as e: print(e) return False
def _initialize_repo(repo_name, repo_dir, config, progress=None): """ Clones repository and configures it to use sparse checkout. Extraneous folders will get removed later using git read-tree """ util.logger.info('Repo {} doesn\'t exist. Cloning...'.format(repo_name)) # Clone repo repo = git.Repo.clone_from( config['GITHUB_ORG'] + repo_name, repo_dir, progress, branch=config['REPO_BRANCH'], ) # Use sparse checkout config = repo.config_writer() config.set_value('core', 'sparsecheckout', True) config.release() util.logger.info('Repo {} initialized'.format(repo_name))
def _find_git_info(self): """ Return information about the state of the Git repository tox is being run from. :return: dict with keys 'dirty' (bool), 'sha' (str), 'tag' (str or None) :rtype: dict """ res = {} logger.debug('Checking git status...') repo = Repo(path=self._gitdir, search_parent_directories=False) res['sha'] = repo.head.commit.hexsha res['dirty'] = repo.is_dirty(untracked_files=True) res['tag'] = None for tag in repo.tags: # each is a git.Tag object if tag.commit.hexsha == res['sha']: res['tag'] = tag.name logger.debug('Git info: %s', res) return res
def _clone_project(project_name): gh = Github() (gh_repo, forked) = gh.get_repo(project_name, fork=True) if not gh_repo: click.echo('error: no repository named %s was found on ' 'your user and configured organizations' % click.style(project_name, bold=True)) sys.exit(1) if forked: click.echo('We found a repository named %s and forked it to your ' 'user, it now accessible at the following url:\n' '%s' % (forked.full_name, gh_repo.html_url)) dest_path = os.path.join(projects_path(), project_name) click.echo('We will clone %s in %s\n' % (gh_repo.ssh_url, dest_path)) vgit('clone', gh_repo.ssh_url, dest_path) # add our upstream remote if gh_repo.parent: repo = Repo(dest_path) repo.create_remote('upstream', gh_repo.parent.ssh_url)
def all_versions(filename): """ Find, open and parse all tagged versions of a json file, including the current version :param filename: The filename to find :return: a dictionary of all the versions, in the form { 'current': {...}, '1.0': {...}, '1.1': {...} } """ repo = git.Repo() versions = { 'current': get_json(filename) } for tag in repo.tags: version_dict = repo.git.show('%s:%s' % (tag.name, filename)) versions[tag.name.strip('v')] = json.loads(version_dict) return versions
def match_commit(self, src_dir): """Check that the current commit matches the recorded commit for this experiment. Raises an error if commits don't match, or if there is dirty state. Args: src_dir (str): path to the Git repository """ if self.metadata['dirty_repo']: raise EnvironmentError('Working directory was dirty when commit was recorded.') repo = Repo(src_dir) if repo.is_dirty(): raise EnvironmentError('Current working directory is dirty.') current_commit = repo.head.object.hexsha.encode('utf-8') exp_commit = self.metadata['commit'] if current_commit != exp_commit: raise EnvironmentError("Commits don't match.\nCurrent: {}\nRecorded: {}".format(current_commit, exp_commit))
def git_push(): """ Git Push handler """ app = get_app() if not have_git: session.flash = GIT_MISSING redirect(URL('site')) form = Form([Field('changelog', requires=IS_NOT_EMPTY())]) #form.element('input[type=submit]')['_value'] = T('Push') #form.add_button(T('Cancel'), URL('site')) if form.accepted: try: repo = git.Repo(os.path.join(apath(r=request), app)) index = repo.index index.add([apath(r=request) + app + '/*']) new_commit = index.commit(form.vars.changelog) origin = repo.remotes.origin origin.push() session.flash = T( "Git repo updated with latest application changes.") redirect(URL('site')) except git.UnmergedEntriesError: session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.") redirect(URL('site')) return dict(app=app, form=form)
def load(self): """Load the library.""" if not git: raise EnvironmentError(MISSING_GIT_ERROR) if os.path.exists(self.path): if not config.CACHE_DISABLE: return shutil.rmtree(self.path, ignore_errors=True) with files.remove_on_exception(self.path): url = self.GIT_URL.format(**vars(self)) repo = git.Repo.clone_from( url=url, to_path=self.path, b=self.branch) if self.commit: repo.head.reset(self.commit, index=True, working_tree=True)
def active_git_branch(path): try: repo = git.Repo(path) except git.exc.NoSuchPathError as error: return None if not repo.working_tree_dir: return None rval = repo.active_branch if not rval: return None rval = str(rval) return rval
def refresh_installed_packages(self): """Fetch latest git information for installed packages. This retrieves information about outdated packages, but does not actually upgrade their installations. Raises: IOError: if the package manifest file can't be written """ for ipkg in self.installed_packages(): clonepath = os.path.join(self.package_clonedir, ipkg.package.name) clone = git.Repo(clonepath) LOG.debug('fetch package %s', ipkg.package.qualified_name()) try: clone.remote().fetch() except git.exc.GitCommandError as error: LOG.warn('failed to fetch package %s: %s', ipkg.package.qualified_name(), error) ipkg.status.is_outdated = _is_clone_outdated( clone, ipkg.status.current_version, ipkg.status.tracking_method) self._write_manifest()
def test_errors_when_remote_has_changes(self): with git_bare_repo() as bare_repo: with git_repo() as repo: touch('README.md') repo.index.add(['README.md']) repo.index.commit('Initial commit') repo.create_remote('origin', url=bare_repo) repo.remotes.origin.push(repo.refs.master) with temp_directory() as path: clone = Repo(bare_repo).clone(path) touch('CHANGELOG.md') clone.index.add(['README.md']) clone.index.commit('Second commit') clone.remotes.origin.push(clone.refs.master) with self.assertRaises(Exception): GitReleaser()
def __init__(self, config=None): self.repo = Repo() self.commit_format = (config or {}).get('commit_format', 'Release {version}') self.tag_format = (config or {}).get('tag_format', '{version}') if self.repo.head.ref != self.repo.heads.master: # TODO: Support releasing from stable/hotfix branches raise Exception('You need to be on the `master` branch in order to do a release.') if self.repo.is_dirty(): raise Exception('Git repository has unstaged changes.') if len(self.repo.untracked_files) > 0: raise Exception('Git repository has untracked files.') if self.has_origin(): self.repo.remotes.origin.fetch() if self.repo.remotes.origin.refs.master.commit != self.repo.head.ref.commit: raise Exception('Master has unsynced changes.')
def git_list_create_tag(working_dir, git_list_file, tag_name, delete_tag_already_exists): git_list = [] with open(git_list_file,'r') as f: for row in f: row = row.strip() if row == "" or row.startswith('#'): continue git_list.append(row) for repo_url in git_list: code_dir = "%s/%s" % (working_dir, get_repo_name(repo_url)) git_repo = None if os.path.exists(code_dir): # re-use current code folder git_repo = git.Repo(code_dir) else: git_repo = git.Repo.clone_from(repo_url, code_dir) git_create_tag(git_repo, tag_name, delete_tag_already_exists) return True
def get_route_info(root_path): routing_path = os.path.join(root_path, "Uber", "uber", "routing.py") with open(routing_path) as f: file_contents = f.read() tree = ast.parse(file_contents) visitor = RouteVisitor() visitor.visit(tree) all_routes = visitor.routable_funcs # Get the commit that added each route and tack it onto the details route_linenos = sorted(set(x[0] for x in all_routes.values())) repo = git.Repo(root_path) blame_by_line = get_line_blames(repo, routing_path, route_linenos) for name, details in all_routes.iteritems(): all_routes[name] = details + (blame_by_line[details[0]],) return all_routes
def install(self, plugin): "Installs a GitHub plugin" print("Installing...", plugin.name) import git # Clone the GitHub repository via the URL. # git.Git().clone(str(plugin.baseurl), install_dir) # Checks if the plugin installation path already exists. if not self.isInstalled(plugin): """Clone the GitHub repository via Plugin URL to install_dir and with depth=1 (shallow clone). """ git.Repo.clone_from(plugin.baseurl, self.install_dir, depth=1) print("Done!") return True else: print("Plugin already installed!") return False
def test_add_plugin_from_git_exception(plugin_manager_fixture, mocker): plugin_manager = plugin_manager_fixture() mock_git = mocker.patch("infrared.core.services.plugins.git") mock_git.Repo.clone_from.side_effect = git.exc.GitCommandError( "some_git_cmd", 1) mock_git.exc.GitCommandError = git.exc.GitCommandError mock_tempfile = mocker.patch("infrared.core.services.plugins.tempfile") mock_shutil = mocker.patch("infrared.core.services.plugins.shutil") mock_os = mocker.patch("infrared.core.services.plugins.os") mock_os.path.exists.return_value = False # add_plugin call with pytest.raises(IRFailedToAddPlugin): plugin_manager.add_plugin( "https://sample_github.null/plugin_repo.git") mock_shutil.rmtree.assert_called_with(mock_tempfile.mkdtemp.return_value)
def createGitRepository(): # Create the git repository, or change into it if it exists datastore = 'datastore' if not os.path.isdir(datastore): os.makedirs(datastore) git.Repo.init(datastore) repo = git.Repo(datastore) os.chdir(datastore) return repo