我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用git.Git()。
def setupTableContextMenu( self, m, addMenu ): super().setupTableContextMenu( m, addMenu ) act = self.ui_actions m.addSection( T_('Status') ) addMenu( m, T_('Diff HEAD vs. Working'), act.tableActionGitDiffHeadVsWorking, act.enablerGitDiffHeadVsWorking, 'toolbar_images/diff.png' ) addMenu( m, T_('Diff HEAD vs. Staged'), act.tableActionGitDiffHeadVsStaged, act.enablerGitDiffHeadVsStaged, 'toolbar_images/diff.png' ) addMenu( m, T_('Diff Staged vs. Working'), act.tableActionGitDiffStagedVsWorking, act.enablerGitDiffStagedVsWorking, 'toolbar_images/diff.png' ) m.addSeparator() addMenu( m, T_('Annotate'), act.tableActionGitAnnotate_Bg, act.enablerTableGitAnnotate ) m.addSection( T_('Git Actions') ) addMenu( m, T_('Stage'), act.tableActionGitStage_Bg, act.enablerGitFilesStage, 'toolbar_images/include.png' ) addMenu( m, T_('Unstage'), act.tableActionGitUnstage_Bg, act.enablerGitFilesUnstage, 'toolbar_images/exclude.png' ) addMenu( m, T_('Revert'), act.tableActionGitRevert_Bg, act.enablerGitFilesRevert, 'toolbar_images/revert.png' ) m.addSeparator() addMenu( m, T_('Rename…'), act.tableActionGitRename_Bg, self.main_window.table_view.enablerTableFilesExists ) addMenu( m, T_('Delete…'), act.tableActionGitDelete_Bg, self.main_window.table_view.enablerTableFilesExists )
def clone(self): """Clone repo and update path to match the current one""" r = self.URL.split('/')[-1].split('.') if len(r) > 1: self.directory = '.'.join(r[0:-1]) else: self.directory = r[0] if self.directory not in os.listdir(): git.Git().clone(self.URL) if self.path is None: self._find_entry_path() elif self.path[0] == '/': self.path = self.path[1:] self.path = os.path.join(self.directory, self.path) else: self.path = os.path.join(self.directory, self.path)
def download_beard(beard_name, upgrade): beard_details = find_record(beard_name) git_ = git.Git("beard_cache") print("Attempting to clone from {}...".format(beard_details['git_url'])) try: repo_dir = join("beard_cache", beard_name) os.makedirs(repo_dir) repo = git.Repo() repo.clone_from(beard_details['git_url'], repo_dir) print("Done!") except FileExistsError: repo = git.Repo("beard_cache/{}".format(beard_name)) if upgrade: print("Updating repo") # Should be origin, since no-one should add another remote. repo.remotes.origin.pull() print("Done!") else: print("Repo already exists. Nothing to do.")
def git_clone(repo_url, ref='master'): ''' :params repo_url - URL of git repo to clone :params ref - branch, commit or reference in the repo to clone Returns a path to the cloned repo ''' if repo_url == '': raise source_exceptions.GitLocationException(repo_url) os.environ['GIT_TERMINAL_PROMPT'] = '0' _tmp_dir = tempfile.mkdtemp(prefix='armada') try: repo = Repo.clone_from(repo_url, _tmp_dir) repo.remotes.origin.fetch(ref) g = Git(repo.working_dir) g.checkout('FETCH_HEAD') except Exception: raise source_exceptions.GitLocationException(repo_url) return _tmp_dir
def getCloneURL(self, name): if self.clone_url is not None: return self.getCloneURLFromPattern(name) # Nothing stops 'name' from escaping the path specified by self.url, like '../../../foo'. I can't see a problem with allowing it other than that it's weird, and allowing normal subdirectory traversal could be useful, so not currently putting any restrictions on 'name' rtn = f"{self.url}/{name}" try: oldEnv = dict(os.environ) os.environ.update(makeGitEnvironment(self)) try: git.Git().ls_remote(rtn) finally: os.environ.clear() os.environ.update(oldEnv) except git.GitCommandError as e: err = e.stderr # Try to strip off the formatting GitCommandError puts on stderr match = re.search("stderr: '(.*)'$", err) if match: err = match.group(1) raise RuntimeError(err) return rtn # Patch stashy's AuthenticationException to print the server's message (mostly for issue #16, detecting a captcha check)
def __init__(self, git_uri, window): self._window = window self._git = Git(git_uri) self._watchdog = WatchDog(self._git.dir) self._watchdog.connect("refresh", self._refresh) self._builder = Gtk.Builder() self._builder.add_from_resource('/com/nautilus/git/ui/location.ui') self._builder.connect_signals({ "open_remote_clicked": self._open_remote_browser, "compare_commits_clicked": self._compare_commits, "popover_clicked": self._trigger_popover, "branch_clicked": self._update_branch }) self._build_widgets()
def install(self, plugin): "Installs a GitHub plugin" print("Installing...", plugin.name) import git # Clone the GitHub repository via the URL. # git.Git().clone(str(plugin.baseurl), install_dir) # Checks if the plugin installation path already exists. if not self.isInstalled(plugin): """Clone the GitHub repository via Plugin URL to install_dir and with depth=1 (shallow clone). """ git.Repo.clone_from(plugin.baseurl, self.install_dir, depth=1) print("Done!") return True else: print("Plugin already installed!") return False
def main(argv=None): """ Execute the application. Arguments are taken from sys.argv by default. """ args = _cmdline(argv) logger.start(args.logging_level) logger.info("executing githeat") try: g = Git(os.getcwd()) except (InvalidGitRepositoryError, GitCommandError, GitCommandNotFound): print("Are you sure you're in an initialized git directory?") return 0 githeat = Githeat(g, **vars(args)) githeat.run() logger.info("successful completion") return 0 # Make it executable.
def install_git_(url=None): os.chdir('modules/') if url == None: return "your url is None" try: git.Git().clone(url) message = {'message':'your modul had installed'} os.chdir('..') except Exception: message = {"message":"install failed"} os.chdir('..') return message
def __init__(self, target_dir, url, branch, git_ssh_identity_file=None, force=False): super(GitRepository, self).__init__(target_dir) self.branch = branch try: if url is None: ColorPrint.exit_after_print_messages(message="GIT URL is empty") if git_ssh_identity_file is None: git_ssh_identity_file = os.path.expanduser("~/.ssh/id_rsa") with git.Git().custom_environment(GIT_SSH=git_ssh_identity_file): if not os.path.exists(target_dir) or not os.listdir(target_dir): self.repo = git.Repo.clone_from(url=url, to_path=target_dir) else: self.repo = git.Repo(target_dir) old_url = self.repo.remotes.origin.url if not GitRepository.is_same_host(old_url, url): if self.is_developer_mode(): ColorPrint.exit_after_print_messages( message="This directory exists with not matching git repository " + target_dir) else: shutil.rmtree(target_dir, onerror=FileUtils.remove_readonly) self.repo = git.Repo.clone_from(url=url, to_path=target_dir) self.set_branch(branch=branch, force=force) except git.GitCommandError as exc: ColorPrint.exit_after_print_messages(message=exc.stderr)
def createProject( self, project ): if shutil.which( git.Git.GIT_PYTHON_GIT_EXECUTABLE ) is None: self.app.log.error( '"git" command line tool not found' ) return None try: return wb_git_project.GitProject( self.app, project, self ) except git.exc.InvalidGitRepositoryError as e: self.log.error( 'Failed to add Git repo %r' % (project.path,) ) self.log.error( 'Git error: %s' % (e,) ) return None #------------------------------------------------------------
def addProjectPreInitWizardHandler( self, name, url, wc_path ): self.log.infoheader( 'Initialise Git repository in %s' % (wc_path,) ) self.setStatusAction( T_('Clone %(project)s') % {'project': name} ) # runs on the background thread
def addProjectPreCloneWizardHandler( self, name, url, wc_path ): self.log.infoheader( T_('Cloning Git repository %(url)s into %(path)s') % {'url': url, 'path': wc_path} ) self.setStatusAction( T_('Clone %(project)s') % {'project': name} ) # runs on the background thread
def setTopWindow( self, top_window ): super().setTopWindow( top_window ) tm = self.table_view.table_model self.all_visible_table_columns = (tm.col_staged, tm.col_status, tm.col_name, tm.col_date) prefs = self.app.prefs.git if prefs.program is not None: git.Git.GIT_PYTHON_GIT_EXECUTABLE = str(prefs.program) self.log.info( 'Git using program %s' % (shutil.which( git.Git.GIT_PYTHON_GIT_EXECUTABLE ),) ) wb_git_project.initCallbackServer( self.app ) wb_git_project.setCallbackCredentialsHandler( self.getGitCredentials )
def about( self ): if shutil.which( git.Git.GIT_PYTHON_GIT_EXECUTABLE ) is None: git_ver = '"git" command line tool not found' else: git_ver = 'git %d.%d.%d' % git.Git().version_info return ['GitPython %s' % (git.__version__,) ,git_ver]
def setupToolBarAtLeft( self, addToolBar, addTool ): t = addToolBar( T_('git logo'), style='font-size: 20pt; width: 40px; color: #cc0000' ) self.all_toolbars.append( t ) addTool( t, 'Git', self.main_window.projectActionSettings )
def lambda_handler(event, context): print "event.dump = " + json.dumps(event) responseData = {} # If not valid cloudformation custom resource call try: git.Git().clone(event["ResourceProperties"]["git_url"]) DIR_NAME = "tmp/git" REMOTE_URL = event["ResourceProperties"]["git_url"] if os.path.isdir(DIR_NAME): shutil.rmtree(DIR_NAME) os.mkdir(DIR_NAME) repo = git.Repo.init(DIR_NAME) origin = repo.create_remote('origin',REMOTE_URL) origin.fetch() origin.pull(origin.refs[0].remote_head) print "---- DONE ----" # Foreach Object in the cloned folder, upload to s3 cloned folder. for filename in os.listdir('DIR_NAME'): buffer+= open(filename, 'rU').read() s3.Bucket(event["ResourceProperties"]["bucket_name"]).put_object(Key=filename, Body=buffer) cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData, ".zip pulled to S3 Bucket!") except Exception: cfnresponse.send(event, context, cfnresponse.FAILED, responseData, "Bucket Name and Key are all required.") print "ERROR"
def __git_init(self): """ Initialize git repository in the project infrastructure path """ if self._git_repo: return Git().clone(self._git_repo, self._repository_directory) else: return Repo.init(self._repository_directory)
def clone(self): clone_path = self.context.clone_path source_repo = self.context.source['repository']['full_name'] if self.context.clone_depth > 0: # Use shallow clone to speed up clone_kwargs = dict( depth=self.context.clone_depth, branch=self.context.source['branch']['name'] ) if self.context.type == 'commit': # ci retry on commit clone_kwargs['no_single_branch'] = True bitbucket.clone( source_repo, clone_path, **clone_kwargs ) else: # Full clone for ci retry in single commit bitbucket.clone(source_repo, clone_path) gitcmd = git.Git(clone_path) if self.context.target: self._merge_pull_request(gitcmd) else: # Push to branch or ci retry comment on some commit if not self.is_commit_exists(gitcmd, self.commit_hash): logger.info('Unshallowing a shallow cloned repository') output = gitcmd.fetch('--unshallow') logger.info('%s', output) logger.info('Checkout commit %s', self.commit_hash) gitcmd.checkout(self.commit_hash) gitmodules = os.path.join(clone_path, '.gitmodules') if os.path.exists(gitmodules): output = gitcmd.submodule('update', '--init', '--recursive') logger.info('%s', output)
def get_conflicted_files(repo_path): gitcmd = git.Git(repo_path) try: return gitcmd.diff('--name-only', '--diff-filter=U') except git.GitCommandError: logger.exception('Error get conflicted files by git diff command') return None
def clone_repository(self, full_name, path, **kwargs): clone_url = self.get_git_url(full_name) return git.Git().clone(clone_url, path, **kwargs)
def load_package_by_file(self): """ ???? ??? ?? :return: """ # ?? ??? ?? ?? # 1. repository_path ? ?? ???? zip??? ??? ??? ??? ??? ??, Git ????? ?? ?? # 2. repository_path Remote Repository ?? pull ? ?? ? ??? pull? ?? # 3. exception? ????, ???? ?? ???. # ????? ??? ?? ???? ???? ?? ???? ????? ???? # ?????? ??? ?? ?? # ?????? ????? ???? ?? ???? (?/..?) ? _ ? ????. score_package_file = get_valid_filename(self.__score_package) logging.debug(self.__repository_path) package_file = osp.join(self.__repository_path, score_package_file+".zip") logging.debug('load_package_by_file '+str(package_file)) if osp.isfile(package_file): package_zip = zipfile.ZipFile(package_file, 'r') # file exists logging.debug("load local package file and package file exist") # ??? ??? ????? ??? ?? ???. # ??? ???? ?? os.makedirs(self.__package_path, exist_ok=True) # ?? ?? ?? package_zip.extractall(self.__package_path) package_zip.close() # ????? ??? ????? ?? ???. return self.load_package_by_local_repository() else: return False
def load_package_by_remote(self): """ ??? ??????? ???? ?? :return: """ # socore package clone from remote repository repository_url = self.__score_base + ':' + self.__score_package + '.git' # Repository Key ? ?? ?? ???. logging.debug("git Path :"+self.__package_path) # repo = Repo(str(self.__package_path)) git = Git(os.getcwd()) # check deploy key if os.path.exists(conf.DEFAULT_SCORE_REPOSITORY_KEY): st = os.stat(conf.DEFAULT_SCORE_REPOSITORY_KEY) # owner read only if bool(st.st_mode & stat.S_IRGRP or st.st_mode & stat.S_IROTH): os.chmod(conf.DEFAULT_SCORE_REPOSITORY_KEY, 0o600) ssh_cmd = 'ssh -o StrictHostKeyChecking=no -i '+conf.DEFAULT_SCORE_REPOSITORY_KEY logging.debug("SSH KEY COMMAND : "+ssh_cmd) git.custom_environment(GIT_SSH_COMMAND=ssh_cmd) logging.debug(f"load_package_by_remote repository_url({repository_url}) package_path({self.__package_path})") self.__package_repository = Repo._clone(git, repository_url, self.__package_path, GitCmdObjectDB, None) logging.debug(f"load_package_by_remote result({self.__package_repository})") if conf.DEFAULT_SCORE_BRANCH != conf.DEFAULT_SCORE_BRANCH_MASTER: self.__package_repository.git.checkout(conf.DEFAULT_SCORE_BRANCH) return True
def checkout_tag(self,name): repo = Repo('./') repo.git.reset('--hard') o = repo.remotes.origin o.fetch() g = Git('./') g.checkout(name) cbpi.notify("Checkout successful", "Please restart the system") return ('', 204)
def get_add_date(git_path, filename): """Method for getting the initial add/commit date of a file.""" try: gitobj = git.Git(git_path) add_date_string = gitobj.log("--follow", "--format=%aD", "--reverse", filename).splitlines()[0] del gitobj gc.collect() logging.info(filename + " was added on " + add_date_string) return dateutil.parser.parse(add_date_string) except Exception as e: logging.debug("Exception during git log for " + filename + ":\n" + (str(e))) return None
def hackish_pull_list_changed_files(git_path): """Pull new updates from remote origin (hack, using git binary - faster but not as safe as GitPython).""" git_repo = git.Repo(git_path) logging.info(" HEAD: " + str(git_repo.head.commit)) logging.info(" is detached: " + str(git_repo.head.is_detached)) logging.info(" is dirty: " + str(git_repo.is_dirty())) if git_repo.head.is_detached: raise Exception("Detached head") if git_repo.is_dirty(): raise Exception("Dirty repository") del git_repo gc.collect() files = set() git_obj = git.Git(git_path) pull_lines = git_obj.pull().splitlines() del git_obj gc.collect() for line in pull_lines: match = re.search(r'^ (.+) \| .*$', line) if not match is None: changed_files = match.group(1).split('=>') for changed_file in changed_files: files.add(changed_file.strip()) return list(files)
def download_yara_rules_git(): git.Git().clone("https://github.com/Yara-Rules/rules")
def git_clone_shallow(git_url, dst_path): git.Git().clone(git_url, dst_path, '--no-single-branch', depth=1) rval = git.Repo(dst_path) rval.git.fetch(tags=True) return rval
def __init__(self, git_uri): self._git = Git(git_uri) self._watchdog = WatchDog(self._git.dir) self._watchdog.connect("refresh", self._refresh) self._builder = Gtk.Builder() self._builder.add_from_resource('/com/nautilus/git/ui/page.ui') self._build_widgets()
def setup(dataDir): engineDir = os.path.join(dataDir, 'xml') os.makedirs(engineDir, exist_ok=True) XmlStorage.dataDir = engineDir XmlStorage.git = Git(engineDir) return XmlStorage.git
def repo_download(github_repo_url): download_dir = os.path.expanduser('~/.iagitup/downloads') mkdirs(os.path.expanduser('~/.iagitup')) mkdirs(download_dir) # parsing url to initialize the github api rul and get the repo_data gh_user, gh_repo = github_repo_url.split('/')[3:] gh_api_url = "https://api.github.com/repos/{}/{}".format(gh_user,gh_repo) # delete the temp directory if exists repo_folder = download_dir+'/'+gh_repo if os.path.exists(repo_folder): shutil.rmtree(repo_folder) # get the data from GitHub api req = requests.get(gh_api_url) if req.status_code == 200: gh_repo_data = json.loads(req.text) # download the repo from github repo_folder = '{}/{}'.format(download_dir,gh_repo) try: git.Git().clone(gh_repo_data['clone_url'],repo_folder) except Exception as e: print 'Error occurred while downloading: {}'.format(github_repo_url) print str(e) exit(1) else: raise ValueError('Error occurred while downloading: {}'.format(github_repo_url)) return gh_repo_data, repo_folder # get descripton from readme md
def __get_branch_url_from_git(self, path): """Gets repo_url and branch from local git directory. Raises: NotValidGitRepoException: If there is no git directory found. Args: path (:obj:`str`): Path to the git directory. Returns: touple: repo URL and branch name from local git directory """ repo_url = None branch_name = None try: g = git.Git(path) LOG.debug('Found remotes {}'.format(','.join([remote for remote in g.remote().split('\n')]))) for remote_name in ('rhos', 'patches', 'origin'): if remote_name in g.remote().split('\n'): repo_url = g.config("--get", "remote.{}.url".format(remote_name)) LOG.debug('Using Remote {}'.format(remote_name)) break if repo_url is None: raise exceptions.NotValidGitRepoException(path) repo = git.Repo(path) branch = repo.active_branch branch_name = branch.name except (git.exc.InvalidGitRepositoryError, git.exc.GitCommandNotFound, git.exc.GitCommandError) as git_ex: LOG.error(git_ex) raise exceptions.NotValidGitRepoException(path) except TypeError: # If not found directly from git, try to get the branch name # from .gitreview file which should be pointing to the proper # branch. LOG.debug('Fallback method to get branch name from .gitreview') branch_name = self.__get_branch_from_gitreview(path) if not branch_name: # Git repo is most likely in detached state # Fallback method of getting branch name, much slower LOG.debug('HEAD detached, fallback method to get branch name') head_branch = os.linesep.join( [s for s in g.log('--pretty=%d').splitlines() if s and "tag:" not in s]) r_index = head_branch.rfind("/") branch_name = head_branch[r_index + 1:-1] LOG.debug('Component repository: %s' % repo_url) LOG.debug('Component branch: %s' % branch_name) return repo_url, branch_name