我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用git.Repo.clone_from()。
def get_by_cluster_id(self, cluster_id): instance = db().query(self.model).\ filter(self.model.env_id == cluster_id).first() if instance is not None: try: instance.repo = Repo(os.path.join(const.REPOS_DIR, instance.repo_name)) except exc.NoSuchPathError: logger.debug("Repo folder does not exist. Cloning repo") self._create_key_file(instance.repo_name, instance.user_key) if instance.user_key: os.environ['GIT_SSH'] = \ self._get_ssh_cmd(instance.repo_name) repo_path = os.path.join(const.REPOS_DIR, instance.repo_name) repo = Repo.clone_from(instance.git_url, repo_path) instance.repo = repo return instance
def create(self, data): if not os.path.exists(const.REPOS_DIR): os.mkdir(const.REPOS_DIR) repo_path = os.path.join(const.REPOS_DIR, data['repo_name']) if os.path.exists(repo_path): logger.debug('Repo directory exists. Removing...') shutil.rmtree(repo_path) user_key = data.get('user_key', '') if user_key: self._create_key_file(data['repo_name'], user_key) os.environ['GIT_SSH'] = self._get_ssh_cmd(data['repo_name']) repo = Repo.clone_from(data['git_url'], repo_path) instance = super(GitRepo, self).create(data) instance.repo = repo return instance
def git_clone(repo_url, ref='master'): ''' :params repo_url - URL of git repo to clone :params ref - branch, commit or reference in the repo to clone Returns a path to the cloned repo ''' if repo_url == '': raise source_exceptions.GitLocationException(repo_url) os.environ['GIT_TERMINAL_PROMPT'] = '0' _tmp_dir = tempfile.mkdtemp(prefix='armada') try: repo = Repo.clone_from(repo_url, _tmp_dir) repo.remotes.origin.fetch(ref) g = Git(repo.working_dir) g.checkout('FETCH_HEAD') except Exception: raise source_exceptions.GitLocationException(repo_url) return _tmp_dir
def init_project(self): project_dir_git = os.path.join(self._project_dir, 'repo') if not self._config['repo']: raise Exception('Empty repo configuration') create_dir(project_dir_git) git_path = os.path.join(project_dir_git, '.git') if os.path.exists(git_path): repo = Repo(project_dir_git) print colored('Pulling', 'green') + ' repo %s' % self._config['repo'] repo.remotes.origin.pull() else: try: print 'cloning... %s' % self._config['repo'] Repo.clone_from(self._config['repo'], project_dir_git) except GitCommandError as e: print 'Repo cannot be found'
def do_clone(self): print "[+] Cloning '{}'".format(self['name']) try: if self['private']: Repo.clone_from(self['address'], self.path(), env=dict(GIT_SSH_COMMAND=self['ssh_cmd'])) else: Repo.clone_from(self['address'], self.path()) dispatcher.update_modules(self) self.update_value('status', 'active') except Exception, e: self['status'] = 'error' self['error_msg'] = 'Could not clone repository, probably due to authentication issues.\n{}'.format(e) self.save() internals = Internals.get(name="updates") internals.update_value("last_update", time())
def clone_student_repo(student, dest=None): """Clones a student repo into a temporary directory Arguments: student (Student) Returns: git.Repo: repo at the cloned path """ logger.info('Cloning repo: %s', student.github) if dest is None: dest = tempfile.mkdtemp() repo_url = student.repo_url logger.debug('%s -> %s', repo_url, dest) GitRepo.clone_from(repo_url, dest) return GitRepo(dest)
def run(self, args): github = self.prompt(args, 'github') email = self.prompt(args, 'email') github_password = self.prompt(args, 'password') org = self.prompt(args, 'org') meta_name = self.prompt(args, 'meta') if not email.endswith('@usc.edu'): print('You must use your USC email address') sys.exit(1) # save the user information to the config file # save into ~/.csci/cscirc by default try: os.mkdir(path.join(path.expanduser('~'), '.csci')) except OSError as e: if e.errno != errno.EEXIST: raise config_path = path.join(path.expanduser('~'), '.csci', 'cscirc') config = Config(config_path) config.github_login = github config.email = email config.github_org = org config.meta_name = meta_name config.github = github_password logger.debug('Writing config to %s', config_path) config.save() logger.info('Cloning meta repo into %s', config.meta_path) GitRepo.clone_from(config.meta_remote, config.meta_path) logger.info('Cloned meta repo')
def main(): parser = argparse.ArgumentParser(description='Find secrets hidden in the depths of git.') parser.add_argument('-o', '--output-file', dest="output_file", help="Filename to write output to (defaults to stdout)") parser.add_argument('git_url', type=str, help='URL for secret searching') args = parser.parse_args() project_path = tempfile.mkdtemp() Repo.clone_from(args.git_url, project_path) repo = Repo(project_path) if args.output_file is not None: with open(args.output_file, "w") as f: utf8_output = codecs.getwriter('utf8') utf8_file = utf8_output(f) find_strings(repo, output_file=utf8_file) else: find_strings(repo) shutil.rmtree(project_path, onerror=del_rw)
def clone_repo(self, local_directory_path=None): """ Clone the Git repository to the given file path. :param local_directory_path: The file path to clone the repository to. If None, then use the default directory path. :return: The Git repository. """ if local_directory_path is None: local_directory_path = self.local_directory_path return Repo.clone_from(self.git_url, local_directory_path) # Protected Methods # Private Methods # Properties
def download_plugin(plugin_id, destination): """ Will download the specified plugin to the specified destination if it is found in the plugin repository. :param plugin_id: The plugin ID to download. :type plugin_id: string :param destination: The destination to download the plugin on disk. :type destination: string """ downloadable_plugins = get_downloadable_plugins() if plugin_id not in downloadable_plugins: log.error('Could not find plugin in repository: %s' %plugin_id) return if os.path.exists(destination): shutil.rmtree(destination) Repo.clone_from(downloadable_plugins[plugin_id]['url'], destination) log.info('%s plugin downloaded' %plugin_id)
def clone_repo(self, repo_dir): Repo.clone_from(self._github_wiki_url, repo_dir)
def install_git(url=None,directory=None,name_modul=None): directory = 'modules/{}'.format(name_modul) if url == None: return "your url is None" if name_modul == None: return "please input your modul" try: Repo.clone_from(url,directory) message = {'message':'your modul had installed'} except Exception: message = {"message":"install failed"} return message
def update(self): if self.need_update(): print("[\033[92mINFO\033[0m] Found an updated version! cloning...") Repo.clone_from(self.url, self.tmp_dir) os.chdir(self.tmp_dir) print("[\033[92mINFO\033[0m] Installing...") if self.install(self.new_version): os.chdir("..") shutil.rmtree(self.tmp_dir) return True else: print("[\033[92mINFO\033[0m] You've got already the last version :)") shutil.rmtree(self.tmp_dir) return False
def clone_repository(user, repo, path): try: if user: Repo.clone_from('https://github.com/%s/%s.git' % (user, repo), path) else: # repo is the gist hash then Repo.clone_from('https://gist.github.com/%s.git' % repo, path) except: print 'Error cloning \'%s\'' % repo
def main(): """lets start our task""" # clone the repo cleanup(LOCAL_WORK_COPY) try: r = Repo.clone_from(git_url, LOCAL_WORK_COPY) except GitCommandError as git_error: print(git_error) exit(-1) d = feedparser.parse( 'https://github.com/mattermost/mattermost-server/releases.atom') release_version = d.entries[0].title[1:] # lets read the dockerfile of the current master dfp = DockerfileParser() with open('./mattermost-openshift-workdir/Dockerfile') as f: dfp.content = f.read() if 'MATTERMOST_VERSION' in dfp.envs: dockerfile_version = dfp.envs['MATTERMOST_VERSION'] # Lets check if we got a new release if semver.compare(release_version, dockerfile_version) == 1: print("Updating from %s to %s" % (dockerfile_version, release_version)) target_branch = 'bots-life/update-to-' + release_version if not pr_in_progress(target_branch): patch_and_push(dfp, r, target_branch, release_version) cleanup(LOCAL_WORK_COPY) create_pr_to_master(target_branch) else: print("There is an open PR for %s, aborting..." % (target_branch)) else: print("we are even with Mattermost %s, no need to update" % release_version)
def install(repo): sheet_dir = get_sheet_path(repo) if os.path.exists(sheet_dir): raise CheatExtException( "%s had been installed at %s" % (repo, sheet_dir)) github_url = get_github_url(repo) Repo.clone_from(github_url, sheet_dir, branch="master") print("%s is installed successfully" % repo)
def prepare(self): self._workspace = tempfile.mkdtemp() self.log("cloning {0}...".format(self.workspace)) self._repo = Repo.clone_from(self._target, self.workspace)
def git_clone(self, name, source): """ Clone the plugin from git repository. :param name: Plugin name. :param source: Source url. :returns: ``True`` if successful. :rtype: bool """ if git_available: target_dir = join(self.path, "plugins", name) temp_dir = join(self.path, "plugins", "temp_" + name) try: # Clone repository to temporary folder repo = Repo.clone_from(source, temp_dir) self.print("cloned '{}' from '{}'".format(name, source)) except: return False # Check if valid uj project. try: UjProject(temp_dir, self) except InvalidUjProjectError: return False # Delete old version of project if exiting rm(target_dir) # Move temp dir to target location and clean. move(temp_dir, target_dir) rm(temp_dir) return True else: return False
def setUpClass(cls): # Deploy path ? clone ?? if osp.exists(cls.__repository_path): shutil.rmtree(cls.__repository_path, True) cls.__repo = Repo.clone_from(cls.__repo_url, cls.__repository_path, branch='master')
def download_addon(name): plugin = cbpi.cache["plugins"].get(name) plugin["loading"] = True if plugin is None: return ('', 404) try: Repo.clone_from(plugin.get("repo_url"), "./modules/plugins/%s/" % (name)) cbpi.notify("Download successful", "Plugin %s downloaded successfully" % name) finally: plugin["loading"] = False return ('', 204)
def init(self, path=None): '''Inicialize repo object or init/clone new Git repo''' if os.path.isdir(self.path): self._repo = Repo.init(path or self.path) else: if self._url: # clone remote is there url mkdir_p(self.path) self._repo = Repo.clone_from(self._url, self.path, branch='master') return self._repo
def _clone_repo(path, git_url): """ Use git to clone locally the neuron in a temp folder :return: """ # clone the repo logger.debug("[ResourcesManager] GIT clone into folder: %s" % path) Utils.print_info("Cloning repository...") # if the folder already exist we remove it if os.path.exists(path): shutil.rmtree(path) else: os.makedirs(path) Repo.clone_from(git_url, path)
def _checkout_if_not_present(repo_url, repos_root): """Returns the existing repository if it exists, otherwises puts a fresh clone in the right place """ repo_name = os.path.basename(urlparse(repo_url).path) local_repo_path = os.path.join(repos_root, repo_name) if os.path.isdir(local_repo_path): return Repo(local_repo_path) else: printerr("Cloning new repo {repo} into {location}".format(repo=repo_name, location=local_repo_path)) return Repo.clone_from(repo_url, local_repo_path)
def clone_repo(self): """Clone a repo and ensures it isn't bare.""" Repo.clone_from(self.repo_url, self.repo_path) self.repo = Repo(self.repo_path) assert not self.repo.bare
def create_local_repo(remote_git,dir,branch): """ Uses the git.clone_from method to clone a remote git repository locally Args: remote_git: Url of the remote git repository dir: Local directory where the contents of the remote git will be downloaded branch: Name of the branch on the remote git which will be used for cloning Returns: git.repo: Reference to the local git repository Raises: git.exc.InvalidGitRepositoryError: If remote git repository is bare git.exc.GitCommandError: If remote git repository does not exist """ if(os.path.exists(dir)): shutil.rmtree(dir) try: repo = Repo.clone_from( url=remote_git, to_path=dir, branch=branch ) if repo.bare: raise git.exc.InvalidGitRepositoryError else: return repo except git.exc.GitCommandError: print "Please make sure you have the correct access rights and the repository exists"
def prepareCaseSourceList(self, gitRepoUrl, envFile=None): """ """ if envFile: ssh_cmd ='ssh -i '+ envFile else: ssh_cmd = 'ssh -i conf/id_rsa' logger.debug("try to clone %s", gitRepoUrl) date = datetime.datetime.now().strftime("%Y%m%d-"+ time.tzname[1] + "-%H%M%S.%f") caseDir = "TestCase-" + date Repo.clone_from(gitRepoUrl, caseDir, branch='master', env={'GIT_SSH_COMMAND': ssh_cmd}) caseDirFullPath = os.path.join(os.getcwd(), caseDir) logger.info("case dir is %s", caseDirFullPath) return caseDirFullPath
def _check_repository(self): """ Check if repository exist and clone it if not. Then fill _repo instance attribute et make a pull. :return: """ if not os.path.exists("%s/.git" % self._repository_path): Repo.clone_from(self.REPOSITORY_ADDRESS, self._repository_path) self._repo = Repo(self._repository_path) self._pull()
def download_dataset(dataset_name, dataset_url): local_git = local_dataset_path(dataset_name) if not os.path.exists(local_git): repo = Repo.clone_from(dataset_url, local_git) print "Downloaded" else: print "Repo exists, pulling latest" repo = Repo(local_git) git = repo.git git.checkout("master") repo.remotes[0].pull() return repo # returns a new session
def update_updater(self, *args): #if self.printer_model == 'Robo R2': # branch = 'r2' #else: # branch = 'c2' branch = 'master' if os.path.exists(self.repo_local_path): def del_rw(action, name, exc): os.chmod(name, stat.S_IWRITE) os.remove(name) # start fresh every time to avoid potential corruptions or misconfigurations rmtree(self.repo_local_path, onerror=del_rw) Repo.clone_from(self.repo_remote_path, self.repo_local_path, branch=branch)
def update_local_repo(): if not os.path.isdir(LOCAL_REPO_DIR): os.makedirs(LOCAL_REPO_DIR) if not os.path.isdir(RECIPE_REPO_DIR): Repo.clone_from(GITHUB_URL, RECIPE_REPO_DIR) Repo(RECIPE_REPO_DIR).remotes.origin.pull()
def clone_github_repo(self): """ Clones a github repo with a username and repository_name """ if not (self.username and self.repository_name): return repository_local_destination = join(MODULES_PATH, 'github', self.username, self.repository_name) if not exists(repository_local_destination): Repo.clone_from(self.repo_url, repository_local_destination, branch='master') init_filename = join(repository_local_destination, '__init__.py') open(init_filename, 'a').close()
def clone_github_repo(self): """ Clones a github repo with a username and repository_name """ repository_local_destination = os.path.join(MODULES_PATH, 'github', self.username, self.repository_name) if not os.path.exists(repository_local_destination): Repo.clone_from(self.repo_url, repository_local_destination, branch='master') init_filename = os.path.join(repository_local_destination, '__init__.py') open(init_filename, 'a').close()
def update_updater(self, *args): if self.printer_model == 'Robo R2': branch = 'r2' else: branch = 'c2' if os.path.exists(self.repo_local_path): # start fresh every time to avoid potential corruptions or misconfigurations rmtree(self.repo_local_path) Repo.clone_from(self.repo_remote_path, self.repo_local_path, branch=branch)
def clone(repo, path): """Clone a given repository object to the specified path. """ try: shutil.rmtree(path) except FileNotFoundError: pass finally: logging.debug("cloning {}".format(repo.clone_url)) repo_local = Repo.clone_from(repo.clone_url, path)
def main(): peepdf_path = os.path.join(VENDOR_ROOT, "peepdf") if not os.path.isfile(os.path.join(peepdf_path, "peepdf.py")): Repo.clone_from('https://github.com/jesparza/peepdf', peepdf_path)
def main(): try: import volatility except ImportError: volpath = os.path.join(VENDOR_ROOT, "volatility") setup_script = os.path.join(volpath, "setup.py") rmtree(volpath, True) Repo.clone_from("https://github.com/volatilityfoundation/volatility.git", volpath) os.chdir(volpath) call(['python', setup_script, 'install'])
def main(): decoders_path = os.path.join(VENDOR_ROOT, 'RATDecoders') if os.path.exists(decoders_path): repo = Repo(decoders_path) repo.remotes.origin.pull() else: Repo.clone_from("https://github.com/kevthehermit/RATDecoders.git", decoders_path)
def install(self): try: Repo.clone_from(self.url, self.app.config['SKILL_FOLDER'] + self.name) json.path = self.app.config['SKILL_FOLDER'] + self.name + "/package.json" data = json.decode_path() self.install_dep(data) self.install_pip(data) skill = get_skill_function(self.app.config['SKILL_FOLDER'], self.name) if (hasattr(skill, 'create_skill') and callable(skill.create_skill)): Module = skill.create_skill() if (hasattr(Module, 'install') and callable(Module.install)): try: Module.install() except Exception as e: logger.error('Install Skill error for ' + self.name + ' : ' + str(e)) if (hasattr(Module, 'get_blueprint') and callable(Module.get_blueprint)): self.app.register_blueprint(Module.get_blueprint()) if data['navbar'] == 'True': navbar.folder = self.name navbar.set_plugin_navbar() os.system('cd ' + self.app.config['SKILL_FOLDER'] + self.name + ' && make compilelang') bot = kernel.set() kernel.train(bot) logger.info('Installation done with success') return json.encode({"status":"success"}) except Exception as e: logger.error('Installation error : ' + str(e)) return json.encode({"status":"error"})
def get_data(self): try: Repo.clone_from('https://github.com/OnyxProject/onyx-data', onyx.__path__[0] + "/data/") logger.info('Successfully get Data') except Exception as e: logger.error('Get Data error : ' + str(e)) raise DataException(str(e))
def clone(self, outdir): LOG.debug("Cloning '%s' to '%s'", self.url, outdir) kwargs = {'recursive': True} if self._depth: LOG.debug("Cloning with depth=%d", self._depth) kwargs['depth'] = self._depth self._repo = Repo.clone_from(self.url, outdir, **kwargs) git = self._repo.git git.checkout(self.branch) self._invalidate_attrs()
def update(ctx): await say(ctx.message, "Updating...") import platform try: from git import Repo except ImportError: await say(ctx.message, "Please install the module gitpython.".format(pip_os)) return import shutil from distutils.dir_util import copy_tree import stat try: os.remove("oldconfig.json") except OSError: pass os.rename("config.json", "oldconfig.json") os.remove("embedbot.py") #lol r i p embedbot if this doesn't work r i p my work os.remove("README.md") os.remove("requirements.txt") repo_url = "https://www.github.com/Luigimaster1/embedbot.git" local_dir = "./tempupdate/" Repo.clone_from(repo_url, local_dir) def del_rw(action, name, exc): os.chmod(name, stat.S_IWRITE) os.remove(name) shutil.rmtree("./tempupdate/.git/", onerror=del_rw) copy_tree(local_dir, ".") shutil.rmtree("./tempupdate/", onerror=del_rw) os.remove("botinfo.json") await say(ctx.message, "The bot has been updated. Please restart the bot.")
def repo_clone(repo_dir, repo_url): """Clone repository to this host.""" repo = Repo.clone_from(repo_url, repo_dir) return repo
def clone_git_repo(git_url): project_path = tempfile.mkdtemp() Repo.clone_from(git_url, project_path) return project_path
def clone_repository(repository): repo_url = repository.url repo_path = repository.get_local_repo_path() Frontend.set_auth(repository) try: repo = Repo.clone_from(repository.url, repository.get_local_repo_path(), branch='master') repository.conn_ok = True except: repository.conn_ok = False repository.save()
def fetch(url, rev, path): repo = Repo.clone_from(url, path) if repo.bare: raise RuntimeError('Empty repo ' + url) git = repo.git git.checkout(rev) repo.create_head(rev) return repo.head.object.hexsha
def get_repo(self): path = self.get_path() if os.path.exists(path): self.logger.info('found_repo') return Repo(path) self.logger.info('cloning_repo') return Repo.clone_from(self.uri, path)