Python git 模块,Repo() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用git.Repo()

项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def git_push():
    """ Git Push handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY()))
    form.element('input[type=submit]')['_value'] = T('Push')
    form.add_button(T('Cancel'), URL('site'))
    form.process()
    if form.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            index = repo.index
            index.add([apath(r=request) + app + '/*'])
            new_commit = index.commit(form.vars.changelog)
            origin = repo.remotes.origin
            origin.push()
            session.flash = T(
                "Git repo updated with latest application changes.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
            redirect(URL('site'))
    return dict(app=app, form=form)
项目:ufodiff    作者:source-foundry    | 项目源码 | 文件源码
def make_testing_branch():
    repo = Repo('.')
    gitobj = repo.git
    # create 'test' branch if it doesn't exist so that it can be used for tests in this module
    git_branch_string = gitobj.branch()
    git_branch_list = git_branch_string.split("\n")
    clean_branch_list = []
    for branch in git_branch_list:
        branch = branch.replace('*', '')
        branch = branch.replace(' ', '')
        clean_branch_list.append(branch)
    if 'testing_branch' in clean_branch_list:
        pass
    else:
        gitobj.branch('testing_branch')


# deletes the temporary new git branch (testing_branch) for testing
项目:ufodiff    作者:source-foundry    | 项目源码 | 文件源码
def delete_testing_branch():
    repo = Repo('.')
    gitobj = repo.git
    # create 'test' branch if it doesn't exist so that it can be used for tests in this module
    git_branch_string = gitobj.branch()
    git_branch_list = git_branch_string.split("\n")
    clean_branch_list = []
    for branch in git_branch_list:
        branch = branch.replace('*', '')
        branch = branch.replace(' ', '')
        clean_branch_list.append(branch)
    if 'testing_branch' in clean_branch_list:
        gitobj.branch('-d', 'testing_branch')

# ///////////////////////////////////////////////////////
#
# pytest capsys capture tests
#    confirms capture of std output and std error streams
#
# ///////////////////////////////////////////////////////
项目:ufodiff    作者:source-foundry    | 项目源码 | 文件源码
def make_testing_branch():
    repo = Repo('.')
    gitobj = repo.git
    # create 'test' branch if it doesn't exist so that it can be used for tests in this module
    git_branch_string = gitobj.branch()
    git_branch_list = git_branch_string.split("\n")
    clean_branch_list = []
    for branch in git_branch_list:
        branch = branch.replace('*', '')
        branch = branch.replace(' ', '')
        clean_branch_list.append(branch)
    if 'testing_branch' in clean_branch_list:
        pass
    else:
        gitobj.branch('testing_branch')

# deletes the temporary new git branch (testing_branch) for testing
项目:ufodiff    作者:source-foundry    | 项目源码 | 文件源码
def delete_testing_branch():
    repo = Repo('.')
    gitobj = repo.git
    # create 'test' branch if it doesn't exist so that it can be used for tests in this module
    git_branch_string = gitobj.branch()
    git_branch_list = git_branch_string.split("\n")
    clean_branch_list = []
    for branch in git_branch_list:
        branch = branch.replace('*', '')
        branch = branch.replace(' ', '')
        clean_branch_list.append(branch)
    if 'testing_branch' in clean_branch_list:
        gitobj.branch('-d', 'testing_branch')


# ///////////////////////////////////////////////////////
#
#  Diff class tests
#
# ///////////////////////////////////////////////////////
项目:BookCloud    作者:livro-aberto    | 项目源码 | 文件源码
def clone(self, name, user):
        """
        create a clone of self with a given name, owned by user
        """
        self.expiration = None
        # create the branch on the database
        new_branch = Branch(name, self.project, self, user)
        db.session.add(new_branch)
        db.session.commit()
        # clone repository in file system
        branch_path = os.path.abspath(join(os.getcwd(), 'repos',
                                           self.project.name,
                                           name, 'source'))
        self.get_repo().clone(branch_path, branch=self.name)
        os.symlink(os.path.abspath(join('repos', self.project.name,
                                        '_resources/low_resolution')),
                   join(branch_path, '_resources'))
        branch_repo = git.Repo(branch_path)
        branch_repo.git.checkout('HEAD', b=name)
        config_repo(branch_repo, user.username, user.email)
        # build the source
        new_branch.build(timeout=60)
        return new_branch
项目:airflow-gcp-k8s    作者:alexvanboxel    | 项目源码 | 文件源码
def git_log(repo, prev_head):
    r = git.Repo(repo)
    if r.head.object.hexsha == prev_head:
        return []

    git_commit_fields = ['id', 'author_name', 'author_email', 'date', 'message']
    git_log_format = ['%H', '%an', '%ae', '%ad', '%s']
    git_log_format = '%x1f'.join(git_log_format) + '%x1e'

    p = Popen('cd %s && git log %s..HEAD --format="%s"' % (repo, prev_head, git_log_format), shell=True, stdout=PIPE)
    (log, _) = p.communicate()
    log = log.strip('\n\x1e').split("\x1e")
    log = [row.strip().split("\x1f") for row in log]
    log = [dict(zip(git_commit_fields, row)) for row in log]

    return log
项目:mimiron    作者:ImageIntelligence    | 项目源码 | 文件源码
def process(self):
        """Initialises Mimiron using the configuration found in `config_path`."""
        for i, repo in enumerate(self.data['terraformRepositories']):
            repo['path'] = os.path.expanduser(repo['path'])
            try:
                git_repo = Repo(repo['path'])
                if git_repo.bare:
                    raise _InvalidGitRepositoryError
                repo['defaultEnvironment'] = repo.get('defaultEnvironment', None)
                repo['tagEnvironment'] = repo.get('tagEnvironment', None)

                repo['git'] = git_repo
                repo['tfvars'] = self._read_tfvars(repo)
            except _InvalidGitRepositoryError:
                raise InvalidGitRepository(repo['path'])
            except _NoSuchPathError:
                raise DeploymentRepositoriesNotSpecified
项目:network-monitoring    作者:rubienr    | 项目源码 | 文件源码
def render(self, context):
        try:
            r = Repo(BASE_DIR)
            version = r.rev_parse("HEAD").hexsha
            context['git'] = {
                "shortHexSha": version[0:7],
                "hexSha": version,
                "shortRemote": "https://github.com",
                "remote": "https://github.com/rubienr/network-monitoring"
            }
        except Exception:
            context['git'] = {
                "shortHexSha": None,
                "hexSha": None,
                "shortRemote": None,
                "remote": None,
            }
        return ""
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def git_push():
    """ Git Push handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY()))
    form.element('input[type=submit]')['_value'] = T('Push')
    form.add_button(T('Cancel'), URL('site'))
    form.process()
    if form.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            index = repo.index
            index.add([apath(r=request) + app + '/*'])
            new_commit = index.commit(form.vars.changelog)
            origin = repo.remotes.origin
            origin.push()
            session.flash = T(
                "Git repo updated with latest application changes.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
            redirect(URL('site'))
    return dict(app=app, form=form)
项目:skybeard-2    作者:LanceMaverick    | 项目源码 | 文件源码
def download_beard(beard_name, upgrade):
    beard_details = find_record(beard_name)
    git_ = git.Git("beard_cache")
    print("Attempting to clone from {}...".format(beard_details['git_url']))
    try:
        repo_dir = join("beard_cache", beard_name)
        os.makedirs(repo_dir)
        repo = git.Repo()
        repo.clone_from(beard_details['git_url'], repo_dir)
        print("Done!")
    except FileExistsError:
        repo = git.Repo("beard_cache/{}".format(beard_name))
        if upgrade:
            print("Updating repo")
            # Should be origin, since no-one should add another remote.
            repo.remotes.origin.pull()
            print("Done!")
        else:
            print("Repo already exists. Nothing to do.")
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_rename(self, rwdir):
        parent = git.Repo.init(osp.join(rwdir, 'parent'))
        sm_name = 'mymodules/myname'
        sm = parent.create_submodule(sm_name, sm_name, url=self._small_repo_url())
        parent.index.commit("Added submodule")

        assert sm.rename(sm_name) is sm and sm.name == sm_name
        assert not sm.repo.is_dirty(index=True, working_tree=False, untracked_files=False)

        new_path = 'renamed/myname'
        assert sm.move(new_path).name == new_path

        new_sm_name = "shortname"
        assert sm.rename(new_sm_name) is sm
        assert sm.repo.is_dirty(index=True, working_tree=False, untracked_files=False)
        assert sm.exists()

        sm_mod = sm.module()
        if osp.isfile(osp.join(sm_mod.working_tree_dir, '.git')) == sm._need_gitfile_submodules(parent.git):
            assert sm_mod.git_dir.endswith(join_path_native('.git', 'modules', new_sm_name))
        # end
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def _clone_repo(cls, repo, url, path, name, **kwargs):
        """:return: Repo instance of newly cloned repository
        :param repo: our parent repository
        :param url: url to clone from
        :param path: repository-relative path to the submodule checkout location
        :param name: canonical of the submodule
        :param kwrags: additinoal arguments given to git.clone"""
        module_abspath = cls._module_abspath(repo, path, name)
        module_checkout_path = module_abspath
        if cls._need_gitfile_submodules(repo.git):
            kwargs['separate_git_dir'] = module_abspath
            module_abspath_dir = osp.dirname(module_abspath)
            if not osp.isdir(module_abspath_dir):
                os.makedirs(module_abspath_dir)
            module_checkout_path = osp.join(repo.working_tree_dir, path)
        # end

        clone = git.Repo.clone_from(url, module_checkout_path, **kwargs)
        if cls._need_gitfile_submodules(repo.git):
            cls._write_git_file_and_module_config(module_checkout_path, module_abspath)
        # end
        return clone
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def module(self):
        """:return: Repo instance initialized from the repository at our submodule path
        :raise InvalidGitRepositoryError: if a repository was not available. This could
            also mean that it was not yet initialized"""
        # late import to workaround circular dependencies
        module_checkout_abspath = self.abspath
        try:
            repo = git.Repo(module_checkout_abspath)
            if repo != self.repo:
                return repo
            # END handle repo uninitialized
        except (InvalidGitRepositoryError, NoSuchPathError):
            raise InvalidGitRepositoryError("No valid repository at %s" % module_checkout_abspath)
        else:
            raise InvalidGitRepositoryError("Repository at %r was not yet checked out" % module_checkout_abspath)
        # END handle exceptions
项目:github-reviewboard-sync    作者:Apptimize-OSS    | 项目源码 | 文件源码
def _get_org_and_name_from_remote(remote):
    """
    Gets the org name and the repo name
    for a github remote.

    :param git.Remote remote:
    :return: The owner and the repo name in that order
    :rtype: unicode, unicode
    """
    url = remote.config_reader.get('url')
    if not url.endswith('.git'):
        url = '{0}.git'.format(url)
    git_info = parse(url)
    _LOG.debug('Repo owner: "{0}"'.format(git_info.owner))
    _LOG.debug('Repo name: "{0}"'.format(git_info.repo))
    return git_info.owner, git_info.repo
项目:github-reviewboard-sync    作者:Apptimize-OSS    | 项目源码 | 文件源码
def _create_pull_request_helper(repo, remote_repo, branch, base, auth=None):
    """
    Creates a pull request to merge the branch into the base.
    Ignores the error if there is already a pull request open
    for the given branch->base

    :param Repo repo:
    :param Repository remote_repo:
    :param unicode branch:
    :param unicode base:
    """
    title, message = construct_message(repo, base, branch)
    try:
        _LOG.info('Creating pull request')
        return remote_repo.create_pull(title=title, head=branch, base=base,
                                       body='Autogenerated: \n\n{0}'.format(message))
    except GithubException as exc:
        if 'errors' in exc.data and len(exc.data['errors']) == 1 and \
            exc.data['errors'][0].get('message', '').startswith('A pull request already exists for'):
            _LOG.warning('A pull request already exists for "{0}".  Continuing.'.format(branch))
            return _get_pull_request(remote_repo, branch, base)
        else:
            raise exc
项目:github-reviewboard-sync    作者:Apptimize-OSS    | 项目源码 | 文件源码
def _get_relevant_commits(repo, commit_shas, branch):
    """
    Gets all commits on the repo with the given shas

    :param Repo repo:
    :param list[unicode] commit_shas:
    :param unicode branch:
    :return: list[Commit]
    """
    remaining_shas = set(commit_shas)
    commits = list()
    for commit in repo.iter_commits(branch):
        if commit.hexsha in remaining_shas:
            commits.append(commit)
            remaining_shas.remove(commit.hexsha)
    return commits
项目:github-reviewboard-sync    作者:Apptimize-OSS    | 项目源码 | 文件源码
def _get_remote(repo, name):
    """
    Gets the remote object raising a MissingRemoteException
    when it does not exist

    :param Repo repo:
    :param unicode name: The remote name
    :return: The remote object
    :rtype: git.remote.Remote
    :raises: MissingRemoteException
    """
    try:
        return repo.remotes[name]
    except IndexError:  # I have no idea why they raise an IndexError instead of KeyError
        raise MissingRemoteException('The remote "{0}" does not exist.  '
                                     'Please select a different remote or'
                                     ' add it using "git remote add" command')
项目:github-reviewboard-sync    作者:Apptimize-OSS    | 项目源码 | 文件源码
def push_repo(repo, branch, remote='origin', remote_branch=None):
    """
    Pushes the repo up to the remote.  It will set
    the upstream to the remote branch.

    :param Repo repo: The repo you wish to push
    :param unicode branch: The branch being pushed
    :param unicode remote: The remote to use (e.g. ``'origin'``
    :param unicode remote_branch: The remote branch.  Defaults
        to the `branch` parameter
    :return: None
    :raises: PushFailedException
    """
    remote_branch = remote_branch or branch
    _checkout_branch(repo, branch)
    _LOG.info("Pushing all commits to remote '{0}'".format(remote))
    remote = _get_remote(repo, remote)
    try:
        remote.push(remote_branch, set_upstream=True)
    except GitCommandError as e:
        _LOG.error(str(e))
        raise PushFailedException('Uh oh, it seems like something went'
                                  ' wrong with the push. "{0}"'.format(str(e)))
项目:open-project-linter    作者:OpenNewsLabs    | 项目源码 | 文件源码
def check_multiple_branches(repository):
    """Check whether a git repository has more than one branch.

    Parameters
    ----------
    repository : string
        Path to a git repository

    Results
    -------
    boolean
        True if the repository has more than 1 branch, False otherwise

    Raises
    ------
    git.InvalidGitRepositoryError if repository is a path to a directory
        but not a git repository
    git.NoSuchPathError if repository is not a path to a directory
    """
    repo = git.Repo(repository)
    branches = repo.branches
    if len(branches) > 1:
        return True
    else:
        return False
项目:Orange-Juice-Problem-Control    作者:function-x    | 项目源码 | 文件源码
def __init__(self, root, testMode=False):
        super(GitManager, self).__init__()
        self.root = root
        self.commitTable = {}  # ??????????(submodule, last push binsha)???
        self.ownerRepo = None
        try:
            self.problemHub = git.Repo(root)
            self.readLastCommitBinsha()
        except git.InvalidGitRepositoryError:
            if not testMode:
                self.problemHub = self.setup()
            else:
                pass
        except FileNotFoundError:
            self.commitTable = {}
        # self.acrot = git.Actor(author, authorEmaill)

    # def __str__(self):
项目:craftbeerpi3    作者:Manuel83    | 项目源码 | 文件源码
def git_status(self):
        repo = Repo('./')
        o = repo.remotes.origin
        o.fetch()
        # Tags
        tags = []
        for t in repo.tags:
            tags.append({"name": t.name, "commit": str(t.commit), "date": t.commit.committed_date,
                         "committer": t.commit.committer.name, "message": t.commit.message})
        try:
            branch_name = repo.active_branch.name
            # test1
        except:
            branch_name = None

        changes = []
        commits_behind = repo.iter_commits('master..origin/master')

        for c in list(commits_behind):
            changes.append({"committer": c.committer.name, "message": c.message})

        return json.dumps({"tags": tags, "headcommit": str(repo.head.commit), "branchname": branch_name,
                           "master": {"changes": changes}})
项目:ExtensionCrawler    作者:logicalhacking    | 项目源码 | 文件源码
def pull_list_changed_files(git_path):
    """Pull new updates from remote origin."""
    git_repo = git.Repo(git_path)
    logging.info(" HEAD: " + str(git_repo.head.commit))
    logging.info("   is detached: " + str(git_repo.head.is_detached))
    logging.info("   is dirty: " + str(git_repo.is_dirty()))
    if git_repo.head.is_detached:
        raise Exception("Detached head")
    if git_repo.is_dirty():
        raise Exception("Dirty repository")

    files = []
    cdnjs_origin = git_repo.remotes.origin
    fetch_info = cdnjs_origin.pull()
    for single_fetch_info in fetch_info:
        for diff in single_fetch_info.commit.diff(
                single_fetch_info.old_commit):
            logging.debug("Found diff: " + str(diff))
            if not diff.a_blob is None:
                if not diff.a_blob.path in files:
                    files.append(diff.a_blob.path)
    return files
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def git_pull():
    """ Git Pull handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    dialog = FORM.confirm(T('Pull'),
                          {T('Cancel'): URL('site')})
    if dialog.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            origin = repo.remotes.origin
            origin.fetch()
            origin.pull()
            session.flash = T("Application updated via git pull")
            redirect(URL('site'))

        except git.CheckoutError:
            session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
        except git.GitCommandError:
            session.flash = T(
                "Pull failed, git exited abnormally. See logs for details.")
            redirect(URL('site'))
        except AssertionError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
    elif 'cancel' in request.vars:
        redirect(URL('site'))
    return dict(app=app, dialog=dialog)
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def git_push():
    """ Git Push handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY()))
    form.element('input[type=submit]')['_value'] = T('Push')
    form.add_button(T('Cancel'), URL('site'))
    form.process()
    if form.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            index = repo.index
            index.add([apath(r=request) + app + '/*'])
            new_commit = index.commit(form.vars.changelog)
            origin = repo.remotes.origin
            origin.push()
            session.flash = T(
                "Git repo updated with latest application changes.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
            redirect(URL('site'))
    return dict(app=app, form=form)
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def deps(repo: Optional[RepoSpec], format: str, file: Optional[str], on_uncloned: str) -> Iterable[str]:
    if file is not None:
        file = Path(file)
        if not file.exists():
            raise ValueError(f"Dependency file does not exist: {file}")
    t = Template(format)
    for clone in iterDeps(repo, on_uncloned, file):
        try:
            hexsha = git.Repo(str(clone.path)).head.commit.hexsha
        except:
            hexsha = '0' * 40
        try:
            yield t.substitute(
                H = hexsha,
                h = hexsha[:7],
                RS = clone.repospec.str(),
                rs = clone.repospec.str(False, False),
                p = str(clone.path),
            )
        except KeyError as e:
            raise ValueError("Invalid format string specifier: %s" % e)
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def git_helper(self):
        self.addHost('daemon', 'host', 'http://localhost', 'user', 'pw', force = True)

        r1 = git.Repo.init('repo1')
        Path('repo1/deps.got').write_text("repo2\nrepo3\n")
        r1.index.add(['deps.got'])
        r1.index.commit('Commit')
        with GotRun(['--here', 'host:repo1', 'repo1', '--force']):
            pass

        r2 = git.Repo.init('repo2')
        r2.index.commit('Commit')
        with GotRun(['--here', 'host:repo2', 'repo2', '--force']):
            pass

        r3 = git.Repo.init('repo3')
        r3.index.commit('Commit')
        with GotRun(['--here', 'host:repo3', 'repo3', '--force']):
            pass

        return r1, r2, r3
项目:fermentrack    作者:thorrak    | 项目源码 | 文件源码
def get_remote_branch_info():
    local_repo = Repo(path=settings.BASE_DIR)

    # Fetch remote branches to ensure we are up to date
    for remote in local_repo.remotes:
        remote.fetch()

    remote_repo = local_repo.remote()

    local_branch = local_repo.active_branch.name
    remote_branches = []

    for this_branch in remote_repo.refs:
        remote_branches.append(this_branch.remote_head)

    return {'local_branch': local_branch, 'remote_branches': remote_branches}
项目:nbpuller    作者:data-8    | 项目源码 | 文件源码
def _initialize_repo(repo_url, repo_dir, branch_name, config, progress=None):
    """
    Clones repository and configures it to use sparse checkout.
    Extraneous folders will get removed later using git read-tree
    """
    util.logger.info('Repo {} doesn\'t exist. Cloning...'.format(repo_url))
    # Clone repo
    repo = git.Repo.clone_from(
        repo_url,
        repo_dir,
        progress,
        branch=branch_name,
    )

    # Use sparse checkout
    config = repo.config_writer()
    config.set_value('core', 'sparsecheckout', True)
    config.release()

    util.logger.info('Repo {} initialized'.format(repo_url))
项目:ndeploy    作者:sglebs    | 项目源码 | 文件源码
def remote_git_add(config_as_dict, url_template, host):
    for repo_url, branch, app_name in repo_and_branch_and_app_name_iterator(config_as_dict):
        repo_dir_name = dir_name_for_repo(repo_url)
        repo_full_path = get_repo_full_path_for_repo_dir_name(repo_dir_name, config_as_dict)
        repo = git.Repo(repo_full_path)
        git_remote = url_template.format(host=host, app_name=app_name)
        gitremote_repo_name = get_remote_repo_name(config_as_dict)
        remote_names = [remote.name for remote in repo.remotes]
        remote_urls = [remote.url for remote in repo.remotes]
        if git_remote in remote_urls:
            print("Already in remote: %s" % git_remote)
            continue
        if gitremote_repo_name in remote_names:
            repo.delete_remote(gitremote_repo_name)
        print("Adding remote '%s' for %s" % (gitremote_repo_name, git_remote))
        repo.create_remote(get_remote_repo_name(config_as_dict), git_remote)
项目:ndeploy    作者:sglebs    | 项目源码 | 文件源码
def git_clone_all(config_as_dict):
    progress = GitProgress()
    for repo_url, branch, app_name, app_props in repo_and_branch_and_app_name_and_app_props_iterator(config_as_dict):
        repo_full_path = get_repo_full_path_for_repo_url(repo_url, config_as_dict)
        if os.path.exists(repo_full_path):
            print("Already cloned %s from %s" % (app_name, repo_url))
            repo = git.Repo(repo_full_path)
            if repo.active_branch.name != branch:
                print("Your local checkout is in a different branch (%s) from the branch you want to deploy (%s). URL: %s" %
                      (repo.active_branch.name, branch, repo_url))
                repo.git.checkout(branch)
            origin = repo.remotes.origin
            #origin.fetch(branch)
            if config_as_dict["gitpull"]:
                origin.pull(branch)
        else:
            os.makedirs(repo_full_path)
            try:
                repo = git.Repo.clone_from(repo_url, repo_full_path, branch=branch, progress=progress)
                print("Cloned: %s" % repo)
            except git.GitCommandError as e:
                print(e)
                return False
项目:interact    作者:data-8    | 项目源码 | 文件源码
def _initialize_repo(repo_name, repo_dir, config, progress=None):
    """
    Clones repository and configures it to use sparse checkout.
    Extraneous folders will get removed later using git read-tree
    """
    util.logger.info('Repo {} doesn\'t exist. Cloning...'.format(repo_name))
    # Clone repo
    repo = git.Repo.clone_from(
        config['GITHUB_ORG'] + repo_name,
        repo_dir,
        progress,
        branch=config['REPO_BRANCH'],
    )

    # Use sparse checkout
    config = repo.config_writer()
    config.set_value('core', 'sparsecheckout', True)
    config.release()

    util.logger.info('Repo {} initialized'.format(repo_name))
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def _find_git_info(self):
        """
        Return information about the state of the Git repository tox is being
        run from.

        :return: dict with keys 'dirty' (bool), 'sha' (str), 'tag' (str or None)
        :rtype: dict
        """
        res = {}
        logger.debug('Checking git status...')
        repo = Repo(path=self._gitdir, search_parent_directories=False)
        res['sha'] = repo.head.commit.hexsha
        res['dirty'] = repo.is_dirty(untracked_files=True)
        res['tag'] = None
        for tag in repo.tags:
            # each is a git.Tag object
            if tag.commit.hexsha == res['sha']:
                res['tag'] = tag.name
        logger.debug('Git info: %s', res)
        return res
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def _clone_project(project_name):
    gh = Github()

    (gh_repo, forked) = gh.get_repo(project_name, fork=True)
    if not gh_repo:
        click.echo('error: no repository named %s was found on '
                   'your user and configured organizations' %
                   click.style(project_name, bold=True))
        sys.exit(1)

    if forked:
        click.echo('We found a repository named %s and forked it to your '
                   'user, it now accessible at the following url:\n'
                   '%s' % (forked.full_name, gh_repo.html_url))

    dest_path = os.path.join(projects_path(), project_name)
    click.echo('We will clone %s in %s\n' % (gh_repo.ssh_url, dest_path))

    vgit('clone', gh_repo.ssh_url, dest_path)

    # add our upstream remote
    if gh_repo.parent:
        repo = Repo(dest_path)
        repo.create_remote('upstream', gh_repo.parent.ssh_url)
项目:vulnerability-rating-taxonomy    作者:bugcrowd    | 项目源码 | 文件源码
def all_versions(filename):
    """
    Find, open and parse all tagged versions of a json file, including the current version

    :param filename: The filename to find
    :return: a dictionary of all the versions, in the form
        {
            'current': {...},
            '1.0': {...},
            '1.1': {...}
        }
    """
    repo = git.Repo()
    versions = {
        'current': get_json(filename)
    }
    for tag in repo.tags:
        version_dict = repo.git.show('%s:%s' % (tag.name, filename))
        versions[tag.name.strip('v')] = json.loads(version_dict)
    return versions
项目:lang2program    作者:kelvinguu    | 项目源码 | 文件源码
def match_commit(self, src_dir):
        """Check that the current commit matches the recorded commit for this experiment.

        Raises an error if commits don't match, or if there is dirty state.

        Args:
            src_dir (str): path to the Git repository
        """
        if self.metadata['dirty_repo']:
            raise EnvironmentError('Working directory was dirty when commit was recorded.')

        repo = Repo(src_dir)
        if repo.is_dirty():
            raise EnvironmentError('Current working directory is dirty.')

        current_commit = repo.head.object.hexsha.encode('utf-8')
        exp_commit = self.metadata['commit']
        if current_commit != exp_commit:
            raise EnvironmentError("Commits don't match.\nCurrent: {}\nRecorded: {}".format(current_commit, exp_commit))
项目:lang2program    作者:kelvinguu    | 项目源码 | 文件源码
def match_commit(self, src_dir):
        """Check that the current commit matches the recorded commit for this experiment.

        Raises an error if commits don't match, or if there is dirty state.

        Args:
            src_dir (str): path to the Git repository
        """
        if self.metadata['dirty_repo']:
            raise EnvironmentError('Working directory was dirty when commit was recorded.')

        repo = Repo(src_dir)
        if repo.is_dirty():
            raise EnvironmentError('Current working directory is dirty.')

        current_commit = repo.head.object.hexsha.encode('utf-8')
        exp_commit = self.metadata['commit']
        if current_commit != exp_commit:
            raise EnvironmentError("Commits don't match.\nCurrent: {}\nRecorded: {}".format(current_commit, exp_commit))
项目:web3py    作者:web2py    | 项目源码 | 文件源码
def git_push():
    """ Git Push handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    form = Form([Field('changelog', requires=IS_NOT_EMPTY())])
    #form.element('input[type=submit]')['_value'] = T('Push')
    #form.add_button(T('Cancel'), URL('site'))

    if form.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            index = repo.index
            index.add([apath(r=request) + app + '/*'])
            new_commit = index.commit(form.vars.changelog)
            origin = repo.remotes.origin
            origin.push()
            session.flash = T(
                "Git repo updated with latest application changes.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
            redirect(URL('site'))
    return dict(app=app, form=form)
项目:loady    作者:timedata-org    | 项目源码 | 文件源码
def load(self):
        """Load the library."""
        if not git:
            raise EnvironmentError(MISSING_GIT_ERROR)

        if os.path.exists(self.path):
            if not config.CACHE_DISABLE:
                return
            shutil.rmtree(self.path, ignore_errors=True)

        with files.remove_on_exception(self.path):
            url = self.GIT_URL.format(**vars(self))
            repo = git.Repo.clone_from(
                url=url, to_path=self.path, b=self.branch)
            if self.commit:
                repo.head.reset(self.commit, index=True, working_tree=True)
项目:package-manager    作者:bro    | 项目源码 | 文件源码
def active_git_branch(path):
    try:
        repo = git.Repo(path)
    except git.exc.NoSuchPathError as error:
        return None

    if not repo.working_tree_dir:
        return None

    rval = repo.active_branch

    if not rval:
        return None

    rval = str(rval)
    return rval
项目:package-manager    作者:bro    | 项目源码 | 文件源码
def refresh_installed_packages(self):
        """Fetch latest git information for installed packages.

        This retrieves information about outdated packages, but does
        not actually upgrade their installations.

        Raises:
            IOError: if the package manifest file can't be written
        """
        for ipkg in self.installed_packages():
            clonepath = os.path.join(self.package_clonedir, ipkg.package.name)
            clone = git.Repo(clonepath)
            LOG.debug('fetch package %s', ipkg.package.qualified_name())

            try:
                clone.remote().fetch()
            except git.exc.GitCommandError as error:
                LOG.warn('failed to fetch package %s: %s',
                         ipkg.package.qualified_name(), error)

            ipkg.status.is_outdated = _is_clone_outdated(
                clone, ipkg.status.current_version, ipkg.status.tracking_method)

        self._write_manifest()
项目:maintain    作者:kylef    | 项目源码 | 文件源码
def test_errors_when_remote_has_changes(self):
        with git_bare_repo() as bare_repo:
            with git_repo() as repo:
                touch('README.md')
                repo.index.add(['README.md'])
                repo.index.commit('Initial commit')
                repo.create_remote('origin', url=bare_repo)
                repo.remotes.origin.push(repo.refs.master)

                with temp_directory() as path:
                    clone = Repo(bare_repo).clone(path)
                    touch('CHANGELOG.md')
                    clone.index.add(['README.md'])
                    clone.index.commit('Second commit')
                    clone.remotes.origin.push(clone.refs.master)

                with self.assertRaises(Exception):
                    GitReleaser()
项目:maintain    作者:kylef    | 项目源码 | 文件源码
def __init__(self, config=None):
        self.repo = Repo()

        self.commit_format = (config or {}).get('commit_format', 'Release {version}')
        self.tag_format = (config or {}).get('tag_format', '{version}')

        if self.repo.head.ref != self.repo.heads.master:
            # TODO: Support releasing from stable/hotfix branches
            raise Exception('You need to be on the `master` branch in order to do a release.')

        if self.repo.is_dirty():
            raise Exception('Git repository has unstaged changes.')

        if len(self.repo.untracked_files) > 0:
            raise Exception('Git repository has untracked files.')

        if self.has_origin():
            self.repo.remotes.origin.fetch()

            if self.repo.remotes.origin.refs.master.commit != self.repo.head.ref.commit:
                raise Exception('Master has unsynced changes.')
项目:slugiot-client    作者:slugiot    | 项目源码 | 文件源码
def git_push():
    """ Git Push handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY()))
    form.element('input[type=submit]')['_value'] = T('Push')
    form.add_button(T('Cancel'), URL('site'))
    form.process()
    if form.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            index = repo.index
            index.add([apath(r=request) + app + '/*'])
            new_commit = index.commit(form.vars.changelog)
            origin = repo.remotes.origin
            origin.push()
            session.flash = T(
                "Git repo updated with latest application changes.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
            redirect(URL('site'))
    return dict(app=app, form=form)
项目:chef_community_cookbooks    作者:DennyZhang    | 项目源码 | 文件源码
def git_list_create_tag(working_dir, git_list_file, tag_name, delete_tag_already_exists):
    git_list = []
    with open(git_list_file,'r') as f:
        for row in f:
            row = row.strip()
            if row == "" or row.startswith('#'):
                continue
            git_list.append(row)

    for repo_url in git_list:
        code_dir = "%s/%s" % (working_dir, get_repo_name(repo_url))
        git_repo = None
        if os.path.exists(code_dir):
            # re-use current code folder
            git_repo = git.Repo(code_dir)
        else:
            git_repo = git.Repo.clone_from(repo_url, code_dir)

        git_create_tag(git_repo, tag_name, delete_tag_already_exists)
    return True
项目:focuson    作者:uber    | 项目源码 | 文件源码
def get_route_info(root_path):
    routing_path = os.path.join(root_path, "Uber", "uber", "routing.py")
    with open(routing_path) as f:
        file_contents = f.read()
    tree = ast.parse(file_contents)
    visitor = RouteVisitor()
    visitor.visit(tree)
    all_routes = visitor.routable_funcs

    # Get the commit that added each route and tack it onto the details
    route_linenos = sorted(set(x[0] for x in all_routes.values()))
    repo = git.Repo(root_path)
    blame_by_line = get_line_blames(repo, routing_path, route_linenos)
    for name, details in all_routes.iteritems():
        all_routes[name] = details + (blame_by_line[details[0]],)
    return all_routes
项目:PluginManager    作者:mandeeps708    | 项目源码 | 文件源码
def install(self, plugin):
        "Installs a GitHub plugin"

        print("Installing...", plugin.name)
        import git

        # Clone the GitHub repository via the URL.
        # git.Git().clone(str(plugin.baseurl), install_dir)

        # Checks if the plugin installation path already exists.
        if not self.isInstalled(plugin):
            """Clone the GitHub repository via Plugin URL to install_dir and
            with depth=1 (shallow clone).
            """
            git.Repo.clone_from(plugin.baseurl, self.install_dir, depth=1)
            print("Done!")
            return True

        else:
            print("Plugin already installed!")
            return False
项目:infrared    作者:redhat-openstack    | 项目源码 | 文件源码
def test_add_plugin_from_git_exception(plugin_manager_fixture, mocker):

    plugin_manager = plugin_manager_fixture()

    mock_git = mocker.patch("infrared.core.services.plugins.git")
    mock_git.Repo.clone_from.side_effect = git.exc.GitCommandError(
        "some_git_cmd", 1)
    mock_git.exc.GitCommandError = git.exc.GitCommandError
    mock_tempfile = mocker.patch("infrared.core.services.plugins.tempfile")
    mock_shutil = mocker.patch("infrared.core.services.plugins.shutil")
    mock_os = mocker.patch("infrared.core.services.plugins.os")
    mock_os.path.exists.return_value = False

    # add_plugin call
    with pytest.raises(IRFailedToAddPlugin):
        plugin_manager.add_plugin(
            "https://sample_github.null/plugin_repo.git")

    mock_shutil.rmtree.assert_called_with(mock_tempfile.mkdtemp.return_value)
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def createGitRepository():
    # Create the git repository, or change into it if it exists
    datastore = 'datastore'
    if not os.path.isdir(datastore): 
        os.makedirs(datastore) 
        git.Repo.init(datastore) 
    repo = git.Repo(datastore)  
    os.chdir(datastore)
    return repo