def init_account(self): """Setup a new GitHub account.""" ghuser = self.api.me() # Setup local access tokens to be used by the webhooks hook_token = ProviderToken.create_personal( 'github-webhook', self.user_id, scopes=['webhooks:event'], is_internal=True, ) # Initial structure of extra data self.account.extra_data = dict( id=ghuser.id, login=ghuser.login, name=ghuser.name, tokens=dict( webhook=hook_token.id, ), repos=dict(), last_sync=iso_utcnow(), ) db.session.add(self.account) # Sync data from GitHub, but don't check repository hooks yet. self.sync(hooks=False)
def generate_token(): ui.info_1("Creating new GitHub token") username = ui.ask_string("Please enter you GitHub username") password = getpass.getpass("Password: ") scopes = ['repo'] # Need a different note for each device, otherwise # gh_api.authorize() will fail note = "tsrc-" + str(uuid.uuid4()) note_url = "https://supertanker.github.io/tsrc" def ask_2fa(): return ui.ask_string("2FA code: ") authorization = github3.authorize(username, password, scopes, note=note, note_url=note_url, two_factor_callback=ask_2fa) return authorization.token
def __init__(self, owner, repo, pr, token, url=None): self.github = None # TODO: support non-PR runs try: pr = int(pr) except (ValueError, TypeError): return if not url or url == 'https://github.com': self.github = github3.GitHub(token=token) else: self.github = github3.GitHubEnterprise(url, token=token) self.owner = owner self.repo = repo self.pr = pr self.pull_request = self.github.pull_request(owner, repo, pr) self.commits = self.pr_commits(self.pull_request) self.last_sha = self.commits[-1].sha self.first_sha = self.commits[0].sha self.parent_sha = git.parent_sha(self.first_sha) self.diff = git.diff(self.parent_sha, self.last_sha)
def connect(self): if self.fqdn != GITHUB_COM_FQDN: # upgrade self.gh from a GitHub object to a GitHubEnterprise object gh = github3.GitHubEnterprise(RepositoryService.build_url(self)) self.gh._session.base_url = gh._session.base_url gh._session = self.gh._session self.gh = gh # propagate ssl certificate parameter self.gh._session.verify = self.session_certificate or not self.session_insecure try: self.gh.login(token=self._privatekey) self.username = self.gh.user().login except github3.models.GitHubError as err: if 401 == err.code: if not self._privatekey: raise ConnectionError('Could not connect to Github. ' 'Please configure .gitconfig ' 'with your github private key.') from err else: raise ConnectionError('Could not connect to Github. ' 'Check your configuration and try again.') from err
def get_auth_token(cls, login, password, prompt=None): import platform if cls.fqdn != GITHUB_COM_FQDN: gh = github3.GitHubEnterprise() else: gh = github3.GitHub() gh.login(login, password, two_factor_callback=lambda: prompt('2FA code> ')) try: auth = gh.authorize(login, password, scopes=[ 'repo', 'delete_repo', 'gist' ], note='git-repo2 token used on {}'.format(platform.node()), note_url='https://github.com/guyzmo/git-repo') return auth.token except github3.models.GitHubError as err: if len(err.args) > 0 and 422 == err.args[0].status_code: raise ResourceExistsError("A token already exist for this machine on your github account.") else: raise err
def github_org(sources, env_vars): cmd_gral_part = cmd_composer(env_vars, 'github') git_repositories = {'repositories':[]} gh = github3.GitHub() for organization in sources['repositories']: if gh.organization(organization) is not None: gh_organization = github3.organization(organization) for repo in gh_organization.iter_repos(): repo_url = 'https://github.com/' + organization + '/' + repo.name + '.git' git_repositories['repositories'].append(repo_url) git(git_repositories, env_vars) for repo in gh_organization.iter_repos(): cmd_github_part = ' ' + organization + ' ' + repo.name + ' -t ' + sources['token'] cmd = cmd_gral_part + cmd_github_part os.system(cmd) else: pass
def get_commits(self, sha=None, path=None, author=None, number=-1, etag=None, since=None, until=None): """ Return a list of all commits in a repository :params: Parameters: sha (str) – (optional), sha or branch to start listing commits from path (str) – (optional), commits containing this path will be listed author (str) – (optional), GitHub login, real name, or email to filter commits by (using commit author) number (int) – (optional), number of commits to return. Default: -1 returns all commits etag (str) – (optional), ETag from a previous request to the same endpoint since (datetime or string) – (optional), Only commits after this date will be returned. This can be a datetime or an ISO8601 formatted date string. until (datetime or string) – (optional), Only commits before this date will be returned. This can be a datetime or an ISO8601 formatted date string. :return: a list of Commit """ # TODO: Should investigate proper use of GitHubIterator to help ratelimiting: https://github3py.readthedocs.io/en/master/examples/iterators.html list_commits = [] for c in self.repo.iter_commits(sha, path, author, number, etag, since, until): list_commits.append(c) logging.info('Retrieved ' + str(len(list_commits)) + ' commits from repository with Username: ' + self.user_name + " / Repo: " + self.repo_name + "...") return list_commits
def feeds(self): """List GitHub's timeline resources in Atom format. :returns: dictionary parsed to include URITemplates """ url = self._build_url('feeds') json = self._json(self._get(url), 200) del json['ETag'] del json['Last-Modified'] urls = [ 'timeline_url', 'user_url', 'current_user_public_url', 'current_user_url', 'current_user_actor_url', 'current_user_organization_url', ] for url in urls: json[url] = URITemplate(json[url]) links = json.get('_links', {}) for d in links.values(): d['href'] = URITemplate(d['href']) return json
def login(self, username=None, password=None, token=None, two_factor_callback=None): """Logs the user into GitHub for protected API calls. :param str username: login name :param str password: password for the login :param str token: OAuth token :param func two_factor_callback: (optional), function you implement to provide the Two Factor Authentication code to GitHub when necessary """ if username and password: self._session.basic_auth(username, password) elif token: self._session.token_auth(token) # The Session method handles None for free. self._session.two_factor_auth_callback(two_factor_callback)
def rate_limit(self): """Returns a dictionary with information from /rate_limit. The dictionary has two keys: ``resources`` and ``rate``. In ``resources`` you can access information about ``core`` or ``search``. Note: the ``rate`` key will be deprecated before version 3 of the GitHub API is finalized. Do not rely on that key. Instead, make your code future-proof by using ``core`` in ``resources``, e.g., :: rates = g.rate_limit() rates['resources']['core'] # => your normal ratelimit info rates['resources']['search'] # => your search ratelimit info .. versionadded:: 0.8 :returns: dict """ url = self._build_url('rate_limit') return self._json(self._get(url), 200)
def github_repo_configure(ctx, with_maintainers_file=False): """Configure GitHub repositories.""" conf = ctx.obj['config'] gh = ctx.obj['client'] for repo in conf.repositories: click.echo('Configuring {}'.format(repo.slug)) repoapi = RepositoryAPI(gh, conf=repo) if repoapi.update_settings(): click.echo('Updated settings') if repoapi.update_team(): click.echo('Updated maintainer team') if repoapi.update_branch_protection(): click.echo('Updated branch protection') if with_maintainers_file: click.echo('Checking MAINTAINERS file') if repoapi.update_maintainers_file(): click.echo('Updated MAINTAINERS file') # TODO prevent merge commits
def protect(self, required_status_checks=None, required_pull_request_reviews=None, dismissal_restrictions=None, restrictions=None, enforce_admins=None): """Enable branch protection (with all features).""" data = { 'required_status_checks': required_status_checks, 'required_pull_request_reviews': required_pull_request_reviews, 'dismissal_restrictions': dismissal_restrictions, 'restrictions': restrictions, 'enforce_admins': enforce_admins, } url = self._build_url('protection', base_url=self._api) return self._json( self._put(url, data=dumps(data), headers=self.PREVIEW_HEADERS), 200) # # Wrapper classes for GitHub API. #
def deconstruct_github_url(url): """ Helper function for deconstructing GitHub repository URL and returning its owner and name :param url: GitHub repository URL :type url: str :returns: repository owner and name :rtype: tuple """ if not url.startswith(GITHUB_PREFIX): raise ValueError( "Passed URL is not a GitHub repository: {}".format(url) ) owner, repo = remove_prefix(url, GITHUB_PREFIX).split('/')[:2] return owner, repo
def _github_get_repository(self, conf: dict): """Create repository object according to configuration. """ try: import github3 except ImportError: raise common.InputError(self, "github3 module not found") github = None if conf.get("github_user") and conf.get("github_token"): try: github = github3.login(username=conf.get("github_user"), token=conf.get("github_token")) except Exception as err: raise common.InputError(self, "Github auth error: " + str(err)) if not github: github = github3.GitHub() repository = github.repository(conf["owner"], conf["repository"]) return repository
def all_users(self, number=-1, etag=None, per_page=None, since=None): """Iterate over every user in the order they signed up for GitHub. .. versionchanged:: 1.0.0 Inserted the ``since`` parameter after the ``number`` parameter. :param int number: (optional), number of users to return. Default: -1, returns all of them :param int since: (optional), ID of the last user that you've seen. :param str etag: (optional), ETag from a previous request to the same endpoint :param int per_page: (optional), number of users to list per request :returns: generator of :class:`User <github3.users.User>` """ url = self._build_url('users') return self._iter(int(number), url, users.User, etag=etag, params={'per_page': per_page, 'since': since})
def login(self, username=None, password=None, token=None, two_factor_callback=None): """Logs the user into GitHub for protected API calls. :param str username: login name :param str password: password for the login :param str token: OAuth token :param func two_factor_callback: (optional), function you implement to provide the Two Factor Authentication code to GitHub when necessary """ if username and password: self.session.basic_auth(username, password) elif token: self.session.token_auth(token) # The Session method handles None for free. self.session.two_factor_auth_callback(two_factor_callback)
def project_info(bot, msg): """Show GitHub project information.""" user, repo_name = msg.match.groups() github = github3.GitHub() repo = github.repository(user, repo_name) if repo is not None: msg.respond( '\x0303{user}/{repo}\x03 | \x0308{stars} stars\x03 | \x0314{description}\x03'.format( user=user, repo=repo_name, stars=repo.stargazers, description=repo.description, ), ping=False, )
def issue_info(bot, msg): """Show GitHub project issue information.""" user, repo_name, issue_num = msg.match.groups() github = github3.GitHub() repo = github.repository(user, repo_name) if repo is not None: issue = repo.issue(int(issue_num)) if issue is not None: msg.respond( '\x0314Issue #{num}: {title}\x03 (\x0308{state}\x03, filed by \x0302{user}\x03)'.format( num=issue_num, title=issue.title, state=issue.state, user=issue.user.login, ), ping=False, )
def pr_info(bot, msg): """Show GitHub project pull request information.""" user, repo_name, pr_num = msg.match.groups() github = github3.GitHub() repo = github.repository(user, repo_name) if repo is not None: pr = repo.pull_request(int(pr_num)) if pr is not None: msg.respond( '\x0314PR #{num}: {title}\x03 (\x0308{state}\x03, submitted by \x0302{user}\x03)'.format( num=pr_num, title=pr.title, state=pr.state, user=pr.user.login, ), ping=False, )
def __init__(self, user_id=None): """Create a GitHub API object.""" self.user_id = user_id
def api(self): """Return an authenticated GitHub API.""" return github3.login(token=self.access_token)
def webhook_url(self): """Return the url to be used by a GitHub webhook.""" webhook_token = ProviderToken.query.filter_by( id=self.account.extra_data['tokens']['webhook'] ).first() if webhook_token: wh_url = current_app.config.get('GITHUB_WEBHOOK_RECEIVER_URL') if wh_url: return wh_url.format(token=webhook_token.access_token) else: raise RuntimeError('You must set GITHUB_WEBHOOK_RECEIVER_URL.')
def sync(self, hooks=True, async_hooks=True): """Synchronize user repositories. :param bool hooks: True for syncing hooks. :param bool async_hooks: True for sending of an asynchronous task to sync hooks. .. note:: Syncing happens from GitHub's direction only. This means that we consider the information on GitHub as valid, and we overwrite our own state based on this information. """ active_repos = {} github_repos = {repo.id: repo for repo in self.api.repositories() if repo.permissions['admin']} for gh_repo_id, gh_repo in github_repos.items(): active_repos[gh_repo_id] = { 'id': gh_repo_id, 'full_name': gh_repo.full_name, 'description': gh_repo.description, } if hooks: self._sync_hooks(list(active_repos.keys()), asynchronous=async_hooks) # Remove ownership from repositories that the user has no longer # 'admin' permissions, or have been deleted. Repository.query.filter( Repository.user_id == self.user_id, ~Repository.github_id.in_(github_repos.keys()) ).update(dict(user_id=None, hook=None), synchronize_session=False) # Update repos and last sync self.account.extra_data.update(dict( repos=active_repos, last_sync=iso_utcnow(), )) self.account.extra_data.changed() db.session.add(self.account)
def create_hook(self, repo_id, repo_name): """Create repository hook.""" config = dict( url=self.webhook_url, content_type='json', secret=current_app.config['GITHUB_SHARED_SECRET'], insecure_ssl='1' if current_app.config['GITHUB_INSECURE_SSL'] else '0', ) ghrepo = self.api.repository_with_id(repo_id) if ghrepo: try: hook = ghrepo.create_hook( 'web', # GitHub identifier for webhook service config, events=['release'], ) except github3.GitHubError as e: # Check if hook is already installed hook_errors = (m for m in e.errors if m['code'] == 'custom' and m['resource'] == 'Hook') if next(hook_errors, None): hooks = (h for h in ghrepo.hooks() if h.config.get('url', '') == config['url']) hook = next(hooks, None) if hook: hook.edit(config=config, events=['release']) finally: if hook: Repository.enable(user_id=self.user_id, github_id=repo_id, name=repo_name, hook=hook.id) return True return False
def _dev_api(cls): """Get a developer instance for GitHub API access.""" gh = github3.GitHub() gh.set_client_id(cls.remote.consumer_key, cls.remote.consumer_secret) return gh
def author(self): """Extract the author's GitHub username from a release.""" return self.release.get('author', {}).get('login')
def files(self): """Extract files to download from GitHub payload.""" tag_name = self.release['tag_name'] repo_name = self.repository['full_name'] zipball_url = self.release['zipball_url'] filename = u'{name}-{tag}.zip'.format(name=repo_name, tag=tag_name) response = self.gh.api.session.head(zipball_url) assert response.status_code == 302, \ u'Could not retrieve archive from GitHub: {0}'.format(zipball_url) yield filename, zipball_url
def publish(self): """Publish GitHub release as record.""" with db.session.begin_nested(): deposit = self.deposit_class.create(self.metadata) deposit['_deposit']['created_by'] = self.event.user_id deposit['_deposit']['owners'] = [self.event.user_id] # Fetch the deposit files for key, url in self.files: deposit.files[key] = self.gh.api.session.get( url, stream=True).raw deposit.publish() recid, record = deposit.fetch_published() self.model.recordmetadata = record.model
def _get_files(owner, repo, sha, tokens): """Get repo file paths """ # TODO: use other tokens if first fails github_api = GitHub(token=tokens[0]) repo_api = github_api.repository(owner, repo) # First attempt - use GitHub Tree API files = _get_files_tree_api(repo_api, sha) if files is None: # Tree is trancated - use GitHub Contents API files = _get_files_contents_api(repo_api, sha) log.debug( 'Remaining GitHub API calls: %s', github_api.rate_limit()['rate']['remaining']) return files
def _get_files_tree_api(repo_api, sha): """Get repo file paths using GitHub Tree API. """ files = [] # https://github.com/sigmavirus24/github3.py/issues/656 tree = repo_api.tree('%s?recursive=1' % sha).to_json() if tree['truncated']: return None for item in tree['tree']: if item['type'] == 'blob': files.append(item['path']) return files
def _get_files_contents_api(repo_api, sha, contents=None): """Get repo file paths using GitHub Contents API. """ files = [] if not contents: contents = repo_api.contents('', sha) for key in sorted(contents): item = contents[key] if item.type == 'file': files.append(item.path) elif item.type == 'dir': dir_contents = repo_api.contents(item.path, sha) files.extend( _get_files_contents_api(repo_api, sha, dir_contents)) return files
def __init__(self): if settings.GITHUB_TOKEN: self.github = login(token=settings.GITHUB_TOKEN) else: self.github = GitHub()
def login(): token = ensure_token() gh_api = github3.GitHub() gh_api.login(token=token) ui.info_2("Successfully logged in on GitHub with login", gh_api.user().login) return gh_api
def github_mock(): github_mock = mock.create_autospec(github3.GitHub, instance=True) mock_session = mock.Mock() github_mock._session = mock_session return github_mock
def check_for_update(self): def find_update(): logging.debug("Checking remote for new updates.") try: gh = GitHub() repo = gh.repository("phac-nml", "irida-miseq-uploader") # get the latest tag from github return next(repo.iter_tags(number=1)) except: logging.warn("Couldn't reach github to check for new version.") raise def handle_update(result): latest_tag = result.get() logging.debug("Found latest version: [{}]".format(latest_tag)) release_url = self.url + "/releases/latest" if LooseVersion(self.__app_version__) < LooseVersion(latest_tag.name): logging.info("Newer version found.") dialog = NewVersionMessageDialog( parent=None, id=wx.ID_ANY, message=("A new version of the IRIDA MiSeq " "Uploader tool is available. You can" " download the latest version from "), title="IRIDA MiSeq Uploader update available", download_url=release_url, style=wx.CAPTION|wx.CLOSE_BOX|wx.STAY_ON_TOP) dialog.ShowModal() dialog.Destroy() else: logging.debug("No new versions found.") dr.startWorker(handle_update, find_update)
def __init__(self, *args, **kwarg): self.gh = github3.GitHub() super(GithubService, self).__init__(*args, **kwarg)
def __init__(self, repo_url=None): self.repo_url = repo_url self.user_name = None self.repo_name = None self.github = None # GitHub Object from github3.py self.repo = None self.api_token = config.github_api_token
def __init__(self, _ask_credentials=None, _ask_2fa=None): self.last_error = None self._ask_credentials = _ask_credentials self._ask_2fa = _ask_2fa self.gh = GitHub(token=self._get_authorization_token()) self.user = self.gh.user()
def __init__(self, login='', password='', token=''): super(GitHub, self).__init__({}) if token: self.login(login, token=token) elif login and password: self.login(login, password)
def _repr(self): if self._session.auth: return '<GitHub [{0[0]}]>'.format(self._session.auth) return '<GitHub at 0x{0:x}>'.format(id(self))
def authorize(self, login, password, scopes=None, note='', note_url='', client_id='', client_secret=''): """Obtain an authorization token from the GitHub API for the GitHub API. :param str login: (required) :param str password: (required) :param list scopes: (optional), areas you want this token to apply to, i.e., 'gist', 'user' :param str note: (optional), note about the authorization :param str note_url: (optional), url for the application :param str client_id: (optional), 20 character OAuth client key for which to create a token :param str client_secret: (optional), 40 character OAuth client secret for which to create the token :returns: :class:`Authorization <Authorization>` """ json = None # TODO: Break this behaviour in 1.0 (Don't rely on self._session.auth) auth = None if self._session.auth: auth = self._session.auth elif login and password: auth = (login, password) if auth: url = self._build_url('authorizations') data = {'note': note, 'note_url': note_url, 'client_id': client_id, 'client_secret': client_secret} if scopes: data['scopes'] = scopes with self._session.temporary_basic_auth(*auth): json = self._json(self._post(url, data=data), 201) return Authorization(json, self) if json else None
def emojis(self): """Retrieves a dictionary of all of the emojis that GitHub supports. :returns: dictionary where the key is what would be in between the colons and the value is the URL to the image, e.g., :: { '+1': 'https://github.global.ssl.fastly.net/images/...', # ... } """ url = self._build_url('emojis') return self._json(self._get(url), 200)
def iter_starred(self, login=None, sort=None, direction=None, number=-1, etag=None): """Iterate over repositories starred by ``login`` or the authenticated user. .. versionchanged:: 0.5 Added sort and direction parameters (optional) as per the change in GitHub's API. :param str login: (optional), name of user whose stars you want to see :param str sort: (optional), either 'created' (when the star was created) or 'updated' (when the repository was last pushed to) :param str direction: (optional), either 'asc' or 'desc'. Default: 'desc' :param int number: (optional), number of repositories to return. Default: -1 returns all repositories :param str etag: (optional), ETag from a previous request to the same endpoint :returns: generator of :class:`Repository <github3.repos.Repository>` """ if login: return self.user(login).iter_starred(sort, direction) params = {'sort': sort, 'direction': direction} self._remove_none(params) url = self._build_url('user', 'starred') return self._iter(int(number), url, Repository, params, etag)
def set_client_id(self, id, secret): """Allows the developer to set their client_id and client_secret for their OAuth application. :param str id: 20-character hexidecimal client_id provided by GitHub :param str secret: 40-character hexidecimal client_secret provided by GitHub """ self._session.params = {'client_id': id, 'client_secret': secret}
def _repr(self): return '<GitHub Enterprise [0.url]>'.format(self)
def _repr(self): return '<GitHub Status>'