我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用six.moves.urllib.parse.quote_plus()。
def run(self, sync): app = sync.app remote = sync.get('projects/%s/branches/' % urlparse.quote_plus(self.project_name)) remote_branches = set() for x in remote: m = self.branch_re.match(x['ref']) if m: remote_branches.add(m.group(1)) with app.db.getSession() as session: local = {} project = session.getProjectByName(self.project_name) for branch in project.branches: local[branch.name] = branch local_branches = set(local.keys()) for name in local_branches-remote_branches: session.delete(local[name]) self.log.info("Deleted branch %s from project %s in local DB.", name, project.name) for name in remote_branches-local_branches: project.createBranch(name) self.log.info("Added branch %s to project %s in local DB.", name, project.name)
def main(total_users, host, user, password, db_name): engine = sqlalchemy.create_engine( 'mysql+pymysql://{user}:{password}@{host}/{db_name}'.format( user=user, password=parse.quote_plus(password), host=host, db_name=db_name)) session = create_session(engine) try: populate_db(session, total_users) finally: session.close()
def format_twitter_post_to_share(post): return quote_plus(format_twitter_post(post))
def test_oauth2_start_flow_default(self): """ Starts a default GlobusAuthorizationCodeFlowManager, Confirms flow is initialized as expected, and can be used. """ # starting with no flow self.assertIsNone(self.cac.current_oauth2_flow_manager) # confirms flow initialized with default flow values flow = self.cac.oauth2_start_flow("uri") self.assertIsInstance(flow, GlobusAuthorizationCodeFlowManager) self.assertEqual(flow.redirect_uri, "uri") self.assertEqual(flow.requested_scopes, " ".join(DEFAULT_REQUESTED_SCOPES)) self.assertEqual(flow.state, "_default") self.assertFalse(flow.refresh_tokens) # confirm client can get url via flow url_res = self.cac.oauth2_get_authorize_url() expected_vals = [self.cac.base_url + "v2/oauth2/authorize?", "client_id=" + self.cac.client_id, "redirect_uri=" + "uri", "scope=" + quote_plus( " ".join(DEFAULT_REQUESTED_SCOPES)), "state=" + "_default", "access_type=" + "online"] for val in expected_vals: self.assertIn(val, url_res) # confirm client can try exchanging code for tokens via flow with self.assertRaises(AuthAPIError) as apiErr: self.cac.oauth2_exchange_code_for_tokens("invalid_code") self.assertEqual(apiErr.exception.http_status, 401) self.assertEqual(apiErr.exception.code, "Error")
def test_oauth2_start_flow_default(self): """ Starts a default GlobusNativeAppFlowManager, Confirms flow is initialized as expected, and can be used. """ # starting with no flow self.assertIsNone(self.nac.current_oauth2_flow_manager) # confirms flow initialized with default flow values flow = self.nac.oauth2_start_flow() self.assertIsInstance(flow, GlobusNativeAppFlowManager) self.assertEqual(flow.redirect_uri, self.nac.base_url + "v2/web/auth-code") self.assertEqual(flow.requested_scopes, " ".join(DEFAULT_REQUESTED_SCOPES)) self.assertEqual(flow.state, "_default") self.assertFalse(flow.refresh_tokens) # confirm client can get url via flow url_res = self.nac.oauth2_get_authorize_url() expected_vals = [self.nac.base_url + "v2/oauth2/authorize?", "client_id=" + self.nac.client_id, "redirect_uri=" + quote_plus(self.nac.base_url + "v2/web/auth-code"), "scope=" + quote_plus( " ".join(DEFAULT_REQUESTED_SCOPES)), "state=" + "_default", "code_challenge=" + quote_plus(flow.challenge), "access_type=" + "online"] for val in expected_vals: self.assertIn(val, url_res) # confirm client can try exchanging code for tokens via flow with self.assertRaises(AuthAPIError) as apiErr: self.nac.oauth2_exchange_code_for_tokens("invalid_code") self.assertEqual(apiErr.exception.http_status, 401) self.assertEqual(apiErr.exception.code, "Error")
def test_oauth2_start_flow_specified(self): """ Starts a GlobusNativeAppFlowManager with specified parameters, Confirms flow is initialized as expected, and can be used. """ # starting with no flow self.assertIsNone(self.nac.current_oauth2_flow_manager) # confirms flow initialized with specified values flow = self.nac.oauth2_start_flow( requested_scopes="scopes", redirect_uri="uri", state="state", verifier=("v" * 43), refresh_tokens=True) self.assertIsInstance(flow, GlobusNativeAppFlowManager) self.assertEqual(flow.redirect_uri, "uri") self.assertEqual(flow.requested_scopes, "scopes") self.assertEqual(flow.state, "state") self.assertTrue(flow.refresh_tokens) # confirm client can get url via flow url_res = self.nac.oauth2_get_authorize_url() verifier, remade_challenge = make_native_app_challenge("v" * 43) expected_vals = [self.nac.base_url + "v2/oauth2/authorize?", "client_id=" + self.nac.client_id, "redirect_uri=" + "uri", "scope=" + "scopes", "state=" + "state", "code_challenge=" + quote_plus(remade_challenge), "access_type=" + "offline"] for val in expected_vals: self.assertIn(val, url_res) # confirm client can try exchanging code for tokens via flow with self.assertRaises(AuthAPIError) as apiErr: self.nac.oauth2_exchange_code_for_tokens("invalid_code") self.assertEqual(apiErr.exception.http_status, 401) self.assertEqual(apiErr.exception.code, "Error")
def build_dynamic_field(self, group, field_meta): """ Builds a field based on JIRA's meta field information """ schema = field_meta['schema'] # set up some defaults for form fields fieldtype = 'text' fkwargs = { 'label': field_meta['name'], 'required': field_meta['required'], } # override defaults based on field configuration if (schema['type'] in ['securitylevel', 'priority'] or schema.get('custom') == JIRA_CUSTOM_FIELD_TYPES['select']): fieldtype = 'select' fkwargs['choices'] = self.make_choices(field_meta.get('allowedValues')) elif field_meta.get('autoCompleteUrl') and \ (schema.get('items') == 'user' or schema['type'] == 'user'): fieldtype = 'select' sentry_url = '/api/0/issues/%s/plugins/%s/autocomplete' % (group.id, self.slug) fkwargs['url'] = '%s?jira_url=%s' % ( sentry_url, quote_plus(field_meta['autoCompleteUrl']), ) fkwargs['has_autocomplete'] = True fkwargs['placeholder'] = 'Start typing to search for a user' elif schema['type'] in ['timetracking']: # TODO: Implement timetracking (currently unsupported alltogether) return None elif schema.get('items') in ['worklog', 'attachment']: # TODO: Implement worklogs and attachments someday return None elif schema['type'] == 'array' and schema['items'] != 'string': fieldtype = 'select' fkwargs.update( { 'multiple': True, 'choices': self.make_choices(field_meta.get('allowedValues')), 'default': [] } ) # break this out, since multiple field types could additionally # be configured to use a custom property instead of a default. if schema.get('custom'): if schema['custom'] == JIRA_CUSTOM_FIELD_TYPES['textarea']: fieldtype = 'textarea' fkwargs['type'] = fieldtype return fkwargs
def encode_uri(uri): split = list(urlsplit(uri)) split[1] = split[1].encode('idna').decode('ascii') split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii') query = list((q, quote_plus(v.encode('utf-8'))) for (q, v) in parse_qsl(split[3])) split[3] = urlencode(query).decode('ascii') return urlunsplit(split)
def get(self, arg, value): """Retrieve single user record by id or username Warnings: User display names are not unique. If using `display_name`, method will fail if multiple Users are returned with the same display name Keyword Args: id (str): Full User ID display_name (str): User display name Returns: User: User instance matching provided inputs Raises: TypeError: Unexpected or more than one keyword argument provided ValueError: No matching user found based on provided inputs, or multiple Users with same display name """ if arg == 'id': response = self._swimlane.request('get', 'user/{}'.format(value)) try: user_data = response.json() except ValueError: raise ValueError('Unable to find user with ID "{}"'.format(value)) return User(self._swimlane, user_data) else: response = self._swimlane.request('get', 'user/search?query={}'.format(quote_plus(value))) matched_users = response.json() # Display name not unique, fail if multiple users share the same target display name target_matches = [] for user_data in matched_users: user_display_name = user_data.get('displayName') if user_display_name == value: target_matches.append(user_data) # No matches if not target_matches: raise ValueError('Unable to find user with display name "{}"'.format(value)) # Multiple matches if len(target_matches) > 1: raise ValueError('Multiple users returned with display name "{}". Matching user IDs: {}'.format( value, ', '.join(['"{}"'.format(r['id']) for r in target_matches]) )) return User(self._swimlane, target_matches[0])
def test_oauth2_get_authorize_url_native(self): """ Starts an auth flow with a NativeAppFlowManager, gets the authorize url validates expected results with both default and specified parameters. """ ac = globus_sdk.AuthClient( client_id=get_client_data()["native_app_client1"]["id"]) # default parameters for starting auth flow flow_manager = globus_sdk.auth.GlobusNativeAppFlowManager(ac) ac.current_oauth2_flow_manager = flow_manager # get url_and validate results url_res = ac.oauth2_get_authorize_url() expected_vals = [ac.base_url + "v2/oauth2/authorize?", "client_id=" + ac.client_id, "redirect_uri=" + quote_plus(ac.base_url + "v2/web/auth-code"), "scope=" + quote_plus( " ".join(DEFAULT_REQUESTED_SCOPES)), "state=" + "_default", "response_type=" + "code", "code_challenge=" + quote_plus(flow_manager.challenge), "code_challenge_method=" + "S256", "access_type=" + "online"] for val in expected_vals: self.assertIn(val, url_res) # starting flow with specified paramaters flow_manager = globus_sdk.auth.GlobusNativeAppFlowManager( ac, requested_scopes="scopes", redirect_uri="uri", state="state", verifier=("a" * 43), refresh_tokens=True) ac.current_oauth2_flow_manager = flow_manager # get url_and validate results url_res = ac.oauth2_get_authorize_url() verifier, remade_challenge = make_native_app_challenge("a" * 43) expected_vals = [ac.base_url + "v2/oauth2/authorize?", "client_id=" + ac.client_id, "redirect_uri=" + "uri", "scope=" + "scopes", "state=" + "state", "response_type=" + "code", "code_challenge=" + quote_plus(remade_challenge), "code_challenge_method=" + "S256", "access_type=" + "offline"] for val in expected_vals: self.assertIn(val, url_res)
def test_oauth2_get_authorize_url_confidential(self): """ Starts an auth flow with a NativeAppFlowManager, gets the authorize url validates expected results with both default and specified parameters. """ ac = globus_sdk.AuthClient( client_id=get_client_data()["confidential_app_client1"]["id"]) # default parameters for starting auth flow flow_manager = globus_sdk.auth.GlobusAuthorizationCodeFlowManager( ac, "uri") ac.current_oauth2_flow_manager = flow_manager # get url_and validate results url_res = ac.oauth2_get_authorize_url() expected_vals = [ac.base_url + "v2/oauth2/authorize?", "client_id=" + ac.client_id, "redirect_uri=" + "uri", "scope=" + quote_plus( " ".join(DEFAULT_REQUESTED_SCOPES)), "state=" + "_default", "response_type=" + "code", "access_type=" + "online"] for val in expected_vals: self.assertIn(val, url_res) # starting flow with specified paramaters flow_manager = globus_sdk.auth.GlobusAuthorizationCodeFlowManager( ac, requested_scopes="scopes", redirect_uri="uri", state="state", refresh_tokens=True) ac.current_oauth2_flow_manager = flow_manager # get url_and validate results url_res = ac.oauth2_get_authorize_url() expected_vals = [ac.base_url + "v2/oauth2/authorize?", "client_id=" + ac.client_id, "redirect_uri=" + "uri", "scope=" + "scopes", "state=" + "state", "response_type=" + "code", "access_type=" + "offline"] for val in expected_vals: self.assertIn(val, url_res)
def selectors_to_qs(selectors): """Convert list of selector dict to query string. :param list selectors: list of dicts representing selectors :returns: querystring :rtype: str|None """ qs = None if not isinstance(selectors, list) or not selectors: return qs qs_list = [] for s in selectors: key = s.get('key') if not key: # invalid w/o key break val = s.get('value') # default missing op to equal with the assumption that the # intent is exists or equal. op = s.get('op', '=') # set-based has equivalence to equality-based, therefore leverage # the set-based formatting if op in ['=', '==', 'in']: # in / equality / exists if val is None: qs_list.append('{}'.format(key)) else: if not isinstance(val, list): val = [val] qs_list.append('{} in ({})'.format(key, ','.join(val))) elif op in ['!=', 'notin']: # not in / non-equality / not exists if val is None: qs_list.append('!{}'.format(key)) else: if not isinstance(val, list): val = [val] qs_list.append('{} notin ({})'.format(key, ','.join(val))) else: # unknown op break else: # successfully processed each selector; format as proper query string qs = '?labelSelector=' + url_parse.quote_plus(','.join(qs_list)) return qs