我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用six.moves.urllib_parse.urlencode()。
def format_yahoo_index_url(symbol, start_date, end_date): """ Format a URL for querying Yahoo Finance for Index data. """ return ( 'http://ichart.finance.yahoo.com/table.csv?' + urlencode({ 's': symbol, # start_date month, zero indexed 'a': start_date.month - 1, # start_date day 'b': start_date.day, # start_date year 'c': start_date.year, # end_date month, zero indexed 'd': end_date.month - 1, # end_date day 'e': end_date.day, # end_date year 'f': end_date.year, # daily frequency 'g': 'd', }) )
def unauthorized_token_request(self): """Return request for unauthorized token (first stage)""" params = self.request_token_extra_arguments() params.update(self.get_scope_argument()) key, secret = self.get_key_and_secret() # decoding='utf-8' produces errors with python-requests on Python3 # since the final URL will be of type bytes decoding = None if six.PY3 else 'utf-8' state = self.get_or_create_state() auth = OAuth1( key, secret, callback_uri=self.get_redirect_uri(state), decoding=decoding, signature_method=SIGNATURE_HMAC, signature_type=SIGNATURE_TYPE_QUERY ) url = self.REQUEST_TOKEN_URL + '?' + urlencode(params) url, _, _ = auth.client.sign(url) return url
def auth_url(self): """Return redirect url""" state = None if self.STATE_PARAMETER or self.REDIRECT_STATE: # Store state in session for further request validation. The state # value is passed as state parameter (as specified in OAuth2 spec), # but also added to redirect_uri, that way we can still verify the # request if the provider doesn't implement the state parameter. # Reuse token if any. name = self.name + '_state' state = self.strategy.session_get(name) or self.state_token() self.strategy.session_set(name, state) params = self.auth_params(state) params.update(self.get_scope_argument()) params.update(self.auth_extra_arguments()) return self.AUTHORIZATION_URL + '?' + urlencode(params)
def modify_start_url(self, start_url): """ Given a SAML redirect URL, parse it and change the ID to a consistent value, so the request is always identical. """ # Parse the SAML Request URL to get the XML being sent to TestShib url_parts = urlparse(start_url) query = dict((k, v[0]) for (k, v) in parse_qs(url_parts.query).items()) xml = OneLogin_Saml2_Utils.decode_base64_and_inflate( query['SAMLRequest'] ) # Modify the XML: xml = xml.decode() xml, changed = re.subn(r'ID="[^"]+"', 'ID="TEST_ID"', xml) self.assertEqual(changed, 1) # Update the URL to use the modified query string: query['SAMLRequest'] = OneLogin_Saml2_Utils.deflate_and_base64_encode( xml ) url_parts = list(url_parts) url_parts[4] = urlencode(query) return urlunparse(url_parts)
def run_test(test_no): """Run a test from its index.""" qs = urlencode({'case': test_no, 'agent': USER_AGENT}) url = server + '/runCase?{}'.format(qs) run_ws(url)
def run_test_cases(case_tuples): """Run a number of test cases from their 'case tuple'""" for case_tuple in case_tuples: print("\n[{}]".format(case_tuple)) qs = urlencode({'agent': USER_AGENT}) url = server + '/runCase?{}'.format(qs) run_ws(url) update_report()
def update_report(): """Tell wstest to update reports.""" print("Updating reports...") qs = urlencode({'agent': USER_AGENT}) url = server + "/updateReports?{}".format(qs) for _ in WebSocket(url): pass
def prepare_request_uri(self, redirect_uri='http://localhost:3000/', scope=list(), force_verify=False, state=''): q = Qry('authorize') q.add_param('response_type', 'token') q.add_param('client_id', self.client_id) q.add_param('redirect_uri', redirect_uri) q.add_param('scope', ' '.join(scope)) q.add_param('force_verify', str(force_verify).lower()) q.add_param('state', state) return '?'.join([q.url, urlencode(q.params)])
def prepare_token_uri(self, scope=list()): q = Qry('token') q.add_param('client_id', self.client_id) q.add_param('client_secret', self.client_secret) q.add_param('grant_type', 'client_credentials') q.add_param('scope', ' '.join(scope)) return '?'.join([q.url, urlencode(q.params)])
def prepare_revoke_uri(self, token): q = Qry('revoke') q.add_param('client_id', self.client_id) q.add_param('token', token) return '?'.join([q.url, urlencode(q.params)])
def auth_url(self): """Return authorization redirect url.""" key, secret = self.get_key_and_secret() callback = self.strategy.absolute_uri(self.redirect_uri) callback = sub(r'^https', 'http', callback) query = urlencode({'cb': callback}) return 'https://www.twilio.com/authorize/{0}?{1}'.format(key, query)
def revoke_token(self, token, uid): if self.REVOKE_TOKEN_URL: url = self.revoke_token_url(token, uid) params = self.revoke_token_params(token, uid) headers = self.revoke_token_headers(token, uid) data = urlencode(params) if self.REVOKE_TOKEN_METHOD != 'GET' \ else None response = self.request(url, params=params, headers=headers, data=data, method=self.REVOKE_TOKEN_METHOD) return self.process_revoke_token_response(response)
def oauth_authorization_request(self, token): """Generate OAuth request to authorize token.""" if not isinstance(token, dict): token = parse_qs(token) params = self.auth_extra_arguments() or {} params.update(self.get_scope_argument()) params[self.OAUTH_TOKEN_PARAMETER_NAME] = token.get( self.OAUTH_TOKEN_PARAMETER_NAME ) state = self.get_or_create_state() params[self.REDIRECT_URI_PARAMETER_NAME] = self.get_redirect_uri(state) return '{0}?{1}'.format(self.authorization_url(), urlencode(params))
def auth_url(self): """Return redirect url""" state = self.get_or_create_state() params = self.auth_params(state) params.update(self.get_scope_argument()) params.update(self.auth_extra_arguments()) params = urlencode(params) if not self.REDIRECT_STATE: # redirect_uri matching is strictly enforced, so match the # providers value exactly. params = unquote(params) return '{0}?{1}'.format(self.authorization_url(), params)
def register_oauth_flow(): """Register URIs used for OAuth flow.""" # https://developer.github.com/v3/oauth/ httpretty.register_uri( httpretty.GET, 'https://github.com/login/oauth/authorize', status=200, body='User required to accept/reject scopes on this page' ) def access_token_callback(request, uri, headers): assert request.method == 'POST' parsed_body = request.parse_request_body(request.body) assert 'client_id' in parsed_body assert 'client_secret' in parsed_body assert 'code' in parsed_body assert 'redirect_uri' in parsed_body if parsed_body['code'][0] == 'bad_verification_code': body = dict( error_uri='http://developer.github.com/v3/oauth/' '#bad-verification-code', error_description='The code passed is ' 'incorrect or expired.', error='bad_verification_code', ) else: body = dict( access_token='%s_token' % parsed_body['code'][0], scope='admin:repo_hook,user:email', token_type='bearer', ) headers['content-type'] = 'application/x-www-form-urlencoded' return ( 200, headers, urllib_parse.urlencode(body) ) httpretty.register_uri( httpretty.POST, 'https://github.com/login/oauth/access_token', body=access_token_callback, )
def record_to_dict(record): # Encodes record title if it is not empty. if record.title: record.title = record.title.encode('ascii', 'ignore').decode('utf-8') bbox = wkt2geom(record.wkt_geometry) min_x, min_y, max_x, max_y = bbox[0], bbox[1], bbox[2], bbox[3] record_dict = { 'title': record.title, 'abstract': record.abstract, 'title_alternate': record.title_alternate, 'checks_list': [], 'bbox': bbox, 'min_x': min_x, 'min_y': min_y, 'max_x': max_x, 'max_y': max_y, 'source': record.source, 'source_type': record.type, 'tile_url': '/layer/%s/wmts/%s/default_grid/{z}/{x}/{y}.png' % (record.identifier, record.title_alternate), 'layer_date': record.date_modified, 'layer_originator': record.creator, 'layer_identifier': record.identifier, 'links': { 'xml': '/'.join(['layer', record.identifier, 'xml']), 'yml': '/'.join(['layer', record.identifier, 'yml']), 'png': '/'.join(['layer', record.identifier, 'png']) }, # 'rectangle': box(min_x, min_y, max_x, max_y), 'layer_geoshape': { 'type': 'envelope', 'coordinates': [ [min_x, max_y], [max_x, min_y] ] } } if(record.format == 'OGC:WMS'): legend_opts = { 'SERVICE': 'WMS', 'VERSION': '1.1.1', 'REQUEST': 'GetLegendGraphic', 'FORMAT': 'image/png', 'LAYER': record.title_alternate } record_dict['legend_url'] = '/layer/%s/service?' % record.identifier + urlencode(legend_opts) record_dict = include_registry_tags(record_dict, record.xml) if record.links: record_dict['references'] = parse_references(record.links) if record.source: record_dict['source_host'] = urlparse(record.source).netloc return record_dict