我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用github.GithubException()。
def list(self, pattern): res = self._EXTRACT_PATTERN.match(pattern) if not res: raise URIException(f"Unable to match {pattern}," " please use 'organization[/repo_pattern]'") org_name = res.group("org") repo_matcher = res.group("repo") or "*" try: repos = self._gh.get_organization(org_name).get_repos() except github.GithubException: repos = self._gh.get_user(org_name).get_repos() for repo in repos: if not fnmatch.fnmatch(repo.name, repo_matcher): continue if self._clone_protocol == self.CloneProtocol.ssh: yield Repo(name=repo.name, url=repo.ssh_url) elif self._clone_protocol == self.CloneProtocol.https: yield Repo(name=repo.name, url=repo.clone_url) else: raise RuntimeError(f"Invalid protocol selected: {self._clone_protocol}")
def run_add_system(name, token, org, system, prompt): """ Adds a new system to the repo. """ repo = get_repo(token=token, org=org, name=name) try: repo.create_label(name=system.strip(), color=SYSTEM_LABEL_COLOR) click.secho("Successfully added new system {}".format(system), fg="green") if prompt and click.confirm("Run update to re-generate the page?"): run_update(name=name, token=token, org=org) except GithubException as e: if e.status == 422: click.secho( "Unable to add new system {}, it already exists.".format(system), fg="yellow") return raise
def test_issue_12(self): e = github.GithubException(422, { u'documentation_url': u'https://developer.github.com/v3/pulls/#create-a-pull-request', u'message': u'Validation Failed', u'errors': [{ u'message': u'No commits between issues-221 and issues-221', u'code': u'custom', u'resource': u'PullRequest'} ]} ) self.assertEqual( "Unable to create pull request: Validation Failed (422)\n" "No commits between issues-221 and issues-221\n" "Check " "https://developer.github.com/v3/pulls/#create-a-pull-request " "for more information.", gpr._format_github_exception("create pull request", e))
def get_repository(): """Get the GitHub repo specified in settings or the default. If the repo doesn't exist, try to create it. """ try: g = Github(**get_github_credentials()) if app_settings.GITHUB_ORG: user = g.get_organization(app_settings.GITHUB_ORG) else: user = g.get_user() try: return user.get_repo(app_settings.GITHUB_REPO) except UnknownObjectException: logging.info("Creating repository {}".format( app_settings.GITHUB_REPO )) return user.create_repo(app_settings.GITHUB_REPO) except GithubException: logging.exception("Unable to configure Github connection.")
def commit_script(path, script): """Commit script string to GitHub repo.""" repository = get_repository() try: sha = repository.get_contents(path).sha repository.update_file( path=path, message='Template update', content=script, sha=sha ) except GithubException: repository.create_file( path=path, message='Template initial commit', content=script ) logging.info("Initial commit of new file {}".format(path))
def testInvalidInput(self): raised = False try: self.g.get_user().create_key("Bad key", "xxx") except github.GithubException as exception: raised = True self.assertEqual(exception.status, 422) self.assertEqual( exception.data, { "errors": [ { "code": "custom", "field": "key", "message": "key is invalid. It must begin with 'ssh-rsa' or 'ssh-dss'. Check that you're copying the public half of the key", "resource": "PublicKey" } ], "message": "Validation Failed" } ) self.assertTrue(raised)
def handle_exceptions(): def wrapper(func): @functools.wraps(func) async def wrapped(*args: Tuple[Any]) -> Response: try: return await func(*args) except KeyError as missing_key: logging.warning(f"Error in {func}: {missing_key} key is missing") return aiohttp.web.Response(text=f'Error: {missing_key} is missing in request', status=400) except TriggearTimeoutError as timeout_error: logging.exception(f'Timeout error raised') return aiohttp.web.Response(text=str(timeout_error), reason=f'Timeout when accessing resources: {str(timeout_error)}', status=504) except GithubException as github_exception: logging.exception(f'Github client raised exception') return aiohttp.web.Response(reason=str(github_exception.data), status=github_exception.status) return wrapped return wrapper
def test__when_github_entity_is_not_found__status_should_return_404_response_with_github_explanation(self): parameters = {'repository': 'repo', 'sha': 'null', 'state': 'State', 'description': 'Description', 'context': 'Context', 'url': 'pr_url'} request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request, strict=True) github_client = mock(spec=github.Github, strict=True) github_repository = mock(spec=github.Repository, strict=True) pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN) # given when(request).json().thenReturn(async_value(parameters)) when(StatusRequestData).is_valid_status_data(parameters).thenReturn(True) when(github_client).get_repo('repo').thenReturn(github_repository) github_exception_data = {'message': 'Not Found', 'documentation_url': 'https://developer.github.com/v3/'} when(github_repository).get_commit('null').thenRaise(github.GithubException(404, github_exception_data)) # when response: aiohttp.web.Response = await pipeline_controller.handle_status(request) # then assert response.status == 404 assert response.reason == str(github_exception_data)
def test__when_github_entity_is_not_found__comment_should_return_404_response_with_github_explanation(self): parameters = {'repository': 'repo', 'sha': 'null', 'body': 'Comment body', 'jobName': 'job'} request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request, strict=True) github_client = mock(spec=github.Github, strict=True) github_repository = mock(spec=github.Repository, strict=True) pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN) # given when(request).json().thenReturn(async_value(parameters)) when(CommentRequestData).is_valid_comment_data(parameters).thenReturn(True) when(github_client).get_repo('repo').thenReturn(github_repository) github_exception_data = {'message': 'Not Found', 'documentation_url': 'https://developer.github.com/v3/'} when(github_repository).get_commit('null').thenRaise(github.GithubException(404, github_exception_data)) # when response: aiohttp.web.Response = await pipeline_controller.handle_comment(request) # then assert response.status == 404 assert response.reason == str(github_exception_data)
def testInvalidInput(self): raised = False try: self.g.get_user().create_key("Bad key", "xxx") except github.GithubException, exception: raised = True self.assertEqual(exception.status, 422) self.assertEqual( exception.data, { "errors": [ { "code": "custom", "field": "key", "message": "key is invalid. It must begin with 'ssh-rsa' or 'ssh-dss'. Check that you're copying the public half of the key", "resource": "PublicKey" } ], "message": "Validation Failed" } ) self.assertTrue(raised)
def _extract(self): try: git_tree = self.api_repo.get_git_tree(self.api_repo.default_branch, recursive=True) except GithubException: git_tree = None items = git_tree.tree if git_tree else [] files = [item for item in items if item.type == 'blob'] extensions_to_count = defaultdict(int) extensions_to_size = defaultdict(int) for file in files: _, extension = os.path.splitext(file.path) extension = extension.lower() extensions_to_count[extension] += 1 extensions_to_size[extension] += file.size total_count = sum(extensions_to_count.values()) total_size = sum(extensions_to_size.values()) relevant_count = sum(extensions_to_count[ext] for ext in self.extensions_to_check) relevant_size = sum(extensions_to_size[ext] for ext in self.extensions_to_check) self._extension_to_count_feature.value = relevant_count / total_count if total_count else 0 self._extension_to_size_feature.value = relevant_size / total_size if total_size else 0
def fetch_readme(repo): cache_key = str(repo.id) cache_file = CACHE_PATH_READMES + os.sep + cache_key # check if file is cached if os.path.isfile(cache_file): with open(cache_file, 'r') as file: return file.read() # create cache folder if not os.path.isdir(CACHE_PATH_READMES): os.mkdir(CACHE_PATH_READMES) try: readme = repo.get_readme() except github.GithubException: # Readme wasn't found logging.warning('no readme found for: ' + repo.full_name) return '' return readme.content
def get_repos(cls, api, testing=False, level=logging.INFO): """ Get the list of repositories """ repos = [] nb_repos = 0 query = api.search_code(cls.GITHUB_QUERY) success_query = False while not success_query: try: for rep in query: nb_repos += 1 repos.append(rep.html_url) cls._log.info( "repo nº{x} :\n{y}".format(x=nb_repos, y=rep.html_url)) success_query = True except GithubException: if testing: success_query = True else: print("Hit rate limit, sleeping 60 seconds...") sleep(60) continue repos = {x for x in repos if cls.HISTORY_FILE.search(x)} return repos
def run(self, event, context): gh_hook = json.loads(event['body']) repo = gh_hook['repository']['full_name'] sha = gh_hook['pull_request']['head']['sha'] try: hooks_yml = get_github().get_repo(repo, lazy=True).get_file_contents('.hooks.yml', ref=sha) logger.info("Fetched .hooks.yml from repo {}".format(repo)) except github.GithubException: logger.error("Missig .hooks.yml on repo {}".format(repo)) send_status(event, context, gh_hook, self.configname, 'success', ".hooks.yml not present in branch") return try: hook_config = yaml.safe_load(hooks_yml.decoded_content) logger.info("Basic yml validation passed") except Exception as e: logger.error("Failed to decode hook yaml: " + e.message) send_status(event, context, gh_hook, self.configname, 'failure', "Could not decode branch .hooks.yml") return logger.info("Advanced schema validation") c = Core(source_data=hook_config, schema_files=[os.path.join(os.path.dirname(__file__), "..", "hooks.schema.yml")]) c.validate(raise_exception=False) vc = len(c.validation_errors) if vc > 0: for err in c.validation_errors: logger.error(" - {}".format(err)) send_status(event, context, gh_hook, self.configname, 'failure', ".hooks.yml has {} validation errors; see log".format(vc)) return send_status(event, context, gh_hook, self.configname, 'success', ".hooks.yml present and valid")
def monitor_inventory_metrics(synonym_mappings): global REPO_SCRAPE_TIMES minus_three_months = (datetime.now() - timedelta(3 * 365 / 12)).isoformat().split('.')[0] def git(): return Github(get_access_token()) for repo in git().search_repositories('org:soundcloud pushed:>' + minus_three_months): owner = get_owner(synonym_mappings, repo) REPO_SCRAPE_TIMES[(owner, repo.name)] = time.time() pulls = list(repo.get_pulls()) observe_inventory(owner, repo.name, pulls) manifests = [None] try: manifests = list(git().search_code( 'repo:soundcloud/%s language:json filename:*manifest*.json' % repo.name)) except GithubException: logger.error('Could not search repo %s!' % repo.name) observe_features(owner, repo.name, manifests) # zero-out deleted repos dead_repos = {tup: last_time for tup, last_time in REPO_SCRAPE_TIMES.iteritems() if last_time < time.time() - 60 * 60} for owner, repo_name in dead_repos.keys(): del REPO_SCRAPE_TIMES[(owner, repo_name)] observe_inventory(owner, repo_name, [])
def testAuthorizationHeaderWithLogin(self): # See special case in Framework.fixAuthorizationHeader g = github.Github("fake_login", "fake_password") try: g.get_user().name except github.GithubException: pass
def testAuthorizationHeaderWithToken(self): # See special case in Framework.fixAuthorizationHeader g = github.Github("ZmFrZV9sb2dpbjpmYWtlX3Bhc3N3b3Jk") try: g.get_user().name except github.GithubException: pass
def testNonJsonDataReturnedByGithub(self): # Replay data was forged according to https://github.com/jacquev6/PyGithub/pull/182 raised = False try: self.g.get_user("jacquev6") except github.GithubException as exception: raised = True self.assertEqual(exception.status, 503) self.assertEqual( exception.data, { "data": "<html><body><h1>503 Service Unavailable</h1>No server is available to handle this request.</body></html>", } ) self.assertTrue(raised)
def testUnknownObject(self): raised = False try: self.g.get_user().get_repo("Xxx") except github.GithubException as exception: raised = True self.assertEqual(exception.status, 404) self.assertEqual(exception.data, {"message": "Not Found"}) if atLeastPython26 and atMostPython2: self.assertEqual(str(exception), "404 {u'message': u'Not Found'}") else: self.assertEqual(str(exception), "404 {'message': 'Not Found'}") # pragma no cover (Covered with Python 3) self.assertTrue(raised)
def testBadAuthentication(self): raised = False try: github.Github("BadUser", "BadPassword").get_user().login except github.GithubException as exception: raised = True self.assertEqual(exception.status, 401) self.assertEqual(exception.data, {"message": "Bad credentials"}) if atLeastPython26 and atMostPython2: self.assertEqual(str(exception), "401 {u'message': u'Bad credentials'}") else: self.assertEqual(str(exception), "401 {'message': 'Bad credentials'}") # pragma no cover (Covered with Python 3) self.assertTrue(raised)
def testExceptionPickling(self): pickle.loads(pickle.dumps(github.GithubException('foo', 'bar')))
def testGetAuthorizationsFailsWhenAutenticatedThroughOAuth(self): g = github.Github(self.oauth_token) raised = False try: list(g.get_user().get_authorizations()) except github.GithubException as exception: raised = True self.assertEqual(exception.status, 404) self.assertTrue(raised)
def testRaiseErrorWithOutBranch(self): raised = False try: self.repo.protect_branch("", True, "everyone", ["test"]) except github.GithubException as exception: raised = True self.assertEqual(exception.status, 404) self.assertEqual( exception.data, { 'documentation_url': 'https://developer.github.com/v3/repos/#get-branch', 'message': 'Branch not found' } ) self.assertTrue(raised)
def testRaiseErrorWithBranchProtectionWithOutContext(self): raised = False try: self.repo.protect_branch("master", True, "everyone") except github.GithubException as exception: raised = True self.assertEqual(exception.status, 422) self.assertEqual( exception.data, { 'documentation_url': 'https://developer.github.com/v3', 'message': 'Invalid request.\n\n"contexts" wasn\'t supplied.' } ) self.assertTrue(raised)
def testMergeWithConflict(self): raised = False try: commit = self.repo.merge("branchForBase", "branchForHead") except github.GithubException as exception: raised = True self.assertEqual(exception.status, 409) self.assertEqual(exception.data, {"message": "Merge conflict"}) self.assertTrue(raised)
def testBadSubscribePubSubHubbub(self): raised = False try: self.repo.subscribe_to_hub("non-existing-event", "http://requestb.in/1bc1sc61") except github.GithubException as exception: raised = True self.assertEqual(exception.status, 422) self.assertEqual(exception.data, {"message": "Invalid event: \"non-existing-event\""}) self.assertTrue(raised)
def set_pr_sync_label_with_retry(self, repo, pr_number): retries = 3 while retries: try: self.__gh_client.get_repo(repo).get_issue(pr_number).add_to_labels(Labels.pr_sync) return except github.GithubException as gh_exception: logging.exception(f'Exception when trying to set label on PR. Exception: {gh_exception}') retries -= 1 await asyncio.sleep(1) raise TriggearTimeoutError(f'Failed to set label on PR #{pr_number} in repo {repo} after 3 retries')
def are_files_in_repo(self, files: List[str], hook: HookDetails) -> bool: try: for file in files: self.__gh_client.get_repo(hook.repository).get_file_contents(path=file, ref=hook.sha if hook.sha else hook.branch) except github.GithubException as gh_exc: logging.exception(f"Exception when looking for file {file} in repo {hook.repository} at ref {hook.sha}/{hook.branch} (Exc: {gh_exc})") return False return True
def test__handle_synchronize__when_handle_pr_sync_raises_exception__handle_labeled_sync_should_be_called_anyway(self): hook_data = {'pull_request': {'number': 12, 'head': {'repo': {'full_name': 'repo'}}}} pr_labels = ['label1', 'label2'] github_controller = GithubController(mock(), mock(), mock(), mock()) # expect expect(github_controller, times=1).get_pr_labels(repository='repo', pr_number=12).thenReturn(pr_labels) expect(github_controller, times=1).handle_pr_sync(hook_data, pr_labels).thenRaise(GithubException(404, 'PR not found')) expect(github_controller, times=1).handle_labeled_sync(hook_data, pr_labels).thenReturn(async_value(None)) # when with pytest.raises(GithubException): await github_controller.handle_synchronize(hook_data)
def test__handle_synchronize__when_handle_pr_and_labeled_sync_raise_exceptions__it_should_be_raised_up(self): hook_data = {'pull_request': {'number': 12, 'head': {'repo': {'full_name': 'repo'}}}} pr_labels = ['label1', 'label2'] github_controller = GithubController(mock(), mock(), mock(), mock()) # expect expect(github_controller, times=1).get_pr_labels(repository='repo', pr_number=12).thenReturn(pr_labels) expect(github_controller, times=1).handle_pr_sync(hook_data, pr_labels).thenRaise(GithubException(404, 'PR not found')) expect(github_controller, times=1).handle_labeled_sync(hook_data, pr_labels).thenRaise(jenkins.JenkinsException()) # when with pytest.raises(jenkins.JenkinsException): await github_controller.handle_synchronize(hook_data)
def test__are_files_in_repo__should_return_false_on_any_missing_file(self): github_client: github.Github = mock(spec=github.Github, strict=True) github_repo: github.Repository.Repository = mock(spec=github.Repository.Repository, strict=True) github_controller = GithubController(github_client, mock(), mock(), mock()) hook_details = mock({'repository': 'repo', 'sha': '123qwe', 'branch': 'master'}, spec=HookDetails, strict=True) files = ['app/main.py', '.gitignore'] expect(github_client).get_repo('repo').thenReturn(github_repo) expect(github_repo).get_file_contents(path='app/main.py', ref='123qwe').thenReturn(None) expect(github_repo).get_file_contents(path='.gitignore', ref='123qwe').thenRaise(GithubException(404, 'File not found')) assert not await github_controller.are_files_in_repo(files, hook_details)
def test__when_method_raises_github_exception__should_return_its_status__and_data_as_reason(self): @handle_exceptions() async def github_exception_raising_coro(): raise github.GithubException(404, {'message': 'Not found'}) response: aiohttp.web.Response = await github_exception_raising_coro() assert response.status == 404 assert response.reason == "{'message': 'Not found'}"
def testNonJsonDataReturnedByGithub(self): # Replay data was forged according to https://github.com/jacquev6/PyGithub/pull/182 raised = False try: self.g.get_user("jacquev6") except github.GithubException, exception: raised = True self.assertEqual(exception.status, 503) self.assertEqual( exception.data, { "data": "<html><body><h1>503 Service Unavailable</h1>No server is available to handle this request.</body></html>", } ) self.assertTrue(raised)
def testUnknownObject(self): raised = False try: self.g.get_user().get_repo("Xxx") except github.GithubException, exception: raised = True self.assertEqual(exception.status, 404) self.assertEqual(exception.data, {"message": "Not Found"}) if atLeastPython26 and atMostPython2: self.assertEqual(str(exception), "404 {u'message': u'Not Found'}") else: self.assertEqual(str(exception), "404 {'message': 'Not Found'}") # pragma no cover (Covered with Python 3) self.assertTrue(raised)
def testBadAuthentication(self): raised = False try: github.Github("BadUser", "BadPassword").get_user().login except github.GithubException, exception: raised = True self.assertEqual(exception.status, 401) self.assertEqual(exception.data, {"message": "Bad credentials"}) if atLeastPython26 and atMostPython2: self.assertEqual(str(exception), "401 {u'message': u'Bad credentials'}") else: self.assertEqual(str(exception), "401 {'message': 'Bad credentials'}") # pragma no cover (Covered with Python 3) self.assertTrue(raised)
def testGetAuthorizationsFailsWhenAutenticatedThroughOAuth(self): g = github.Github(self.oauth_token) raised = False try: list(g.get_user().get_authorizations()) except github.GithubException, exception: raised = True self.assertEqual(exception.status, 404) self.assertTrue(raised)
def testRaiseErrorWithOutBranch(self): raised = False try: self.repo.protect_branch("", True, "everyone", ["test"]) except github.GithubException, exception: raised = True self.assertEqual(exception.status, 404) self.assertEqual( exception.data, { u'documentation_url': u'https://developer.github.com/v3/repos/#get-branch', u'message': u'Branch not found' } ) self.assertTrue(raised)
def testRaiseErrorWithBranchProtectionWithOutContext(self): raised = False try: self.repo.protect_branch("master", True, "everyone") except github.GithubException, exception: raised = True self.assertEqual(exception.status, 422) self.assertEqual( exception.data, { u'documentation_url': u'https://developer.github.com/v3', u'message': u'Invalid request.\n\n"contexts" wasn\'t supplied.' } ) self.assertTrue(raised)
def testMergeWithConflict(self): raised = False try: commit = self.repo.merge("branchForBase", "branchForHead") except github.GithubException, exception: raised = True self.assertEqual(exception.status, 409) self.assertEqual(exception.data, {"message": "Merge conflict"}) self.assertTrue(raised)
def testBadSubscribePubSubHubbub(self): raised = False try: self.repo.subscribe_to_hub("non-existing-event", "http://requestb.in/1bc1sc61") except github.GithubException, exception: raised = True self.assertEqual(exception.status, 422) self.assertEqual(exception.data, {"message": "Invalid event: \"non-existing-event\""}) self.assertTrue(raised)