我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用urllib.parse.urlencode()。
def url_builder(self, endpoint, *, root=None, params=None, url_params=None): """Create a URL for the specified endpoint. Arguments: endpoint (:py:class:`str`): The API endpoint to access. root: (:py:class:`str`, optional): The root URL for the service API. params: (:py:class:`dict`, optional): The values for format into the created URL (defaults to ``None``). url_params: (:py:class:`dict`, optional): Parameters to add to the end of the URL (defaults to ``None``). Returns: :py:class:`str`: The resulting URL. """ if root is None: root = self.ROOT return ''.join([ root, endpoint, '?' + urlencode(url_params) if url_params else '', ]).format(**params or {})
def request_encode_url(self, method, url, fields=None, headers=None, **urlopen_kw): """ Make a request using :meth:`urlopen` with the ``fields`` encoded in the url. This is useful for request methods like GET, HEAD, DELETE, etc. """ if headers is None: headers = self.headers extra_kw = {'headers': headers} extra_kw.update(urlopen_kw) if fields: url += '?' + urlencode(fields) return self.urlopen(method, url, **extra_kw)
def existing_tags(target_uri, h):#, doi, text, h): params = { 'limit':200, 'uri':target_uri, 'group':h.group, 'user':h.username, } query_url = h.query_url_template.format(query=urlencode(params, True)) obj = h.authenticated_api_query(query_url) rows = obj['rows'] tags = {} unresolved_exacts = {} for row in rows: for tag in row['tags']: if tag.startswith('RRID:'): tags[tag] = row['id'] elif tag.startswith('PMID:'): tags[tag] = row['id'] elif tag.startswith('DOI:'): tags[tag] = row['id'] elif tag == 'RRIDCUR:Unresolved': unresolved_exacts[row['target'][0]['selector'][0]['exact']] = row['id'] return tags, unresolved_exacts
def net_billings(self, username, now_bytes_total): global monitor_vnodes if not username in self.net_lastbillings.keys(): self.net_lastbillings[username] = 0 elif int(now_bytes_total/self.bytes_per_beans) < self.net_lastbillings[username]: self.net_lastbillings[username] = 0 diff = int(now_bytes_total/self.bytes_per_beans) - self.net_lastbillings[username] if diff > 0: auth_key = env.getenv('AUTH_KEY') data = {"owner_name":username,"billing":diff, "auth_key":auth_key} header = {'Content-Type':'application/x-www-form-urlencoded'} http = Http() [resp,content] = http.request("http://"+self.master_ip+"/billing/beans/","POST",urlencode(data),headers = header) logger.info("response from master:"+content.decode('utf-8')) self.net_lastbillings[username] += diff monitor_vnodes[username]['net_stats']['net_billings'] = self.net_lastbillings[username]
def getCsvReport(product_list, startdate, enddate, source_obj): print print ("Requesting a csv report for the given time period") headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8','Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'} path = "/billing-usage/v1/contractUsageData/csv" parameters = { "reportSources" :[source_obj], "products" :product_list, "startDate" :startdate, "endDate" :enddate } print data_string = parse.urlencode({p: json.dumps(parameters[p]) for p in parameters}) products_result = session.post(parse.urljoin(baseurl,path),data=data_string, headers=headers) products_csv = products_result.text return products_csv
def make_pagination_headers(limit, curpage, total, link_header=True): """Return Link Hypermedia Header.""" lastpage = int(math.ceil(1.0 * total / limit) - 1) headers = {'X-Total-Count': str(total), 'X-Limit': str(limit), 'X-Page-Last': str(lastpage), 'X-Page': str(curpage)} if not link_header: return headers base = "{}?%s".format(request.path) links = {} links['first'] = base % urlencode(dict(request.args, **{PAGE_ARG: 0})) links['last'] = base % urlencode(dict(request.args, **{PAGE_ARG: lastpage})) if curpage: links['prev'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage - 1})) if curpage < lastpage: links['next'] = base % urlencode(dict(request.args, **{PAGE_ARG: curpage + 1})) headers['Link'] = ",".join(['<%s>; rel="%s"' % (v, n) for n, v in links.items()]) return headers # pylama:ignore=R0201
def get_sendgrid_request_message(cfg, keyid, hex, user_email): url_prefix = urljoin( cfg.config.megserver_hostname_url, os.path.join(cfg.config.meg_url_prefix, "revoke") ) params = urlencode([("keyid", keyid), ("token", hex)]) parsed = list(urlparse(url_prefix)) parsed[4] = params revocation_link = urlunparse(parsed) message = Mail() message.add_to(user_email) message.set_from(cfg.config.sendgrid.from_email) message.set_subject(cfg.config.sendgrid.subject) message.set_html(EMAIL_HTML.format(keyid=keyid, link=revocation_link)) return message
def list_archive_timestamps(url, min_date, max_date, user_agent): """ List the available archive between min_date and max_date for the given URL """ logger.info('Listing the archives for the url {url}'.format(url=url)) # Construct the URL used to download the memento list parameters = {'url': url, 'output': 'json', 'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT), 'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)} cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters)) req = Request(cdx_url, None, {'User-Agent': user_agent}) with urlopen(req) as cdx: memento_json = cdx.read().decode("utf-8") timestamps = [] # Ignore the first line which contains column names for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]: # Ignore archives with a status code != OK if status_code == '200': timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT)) return timestamps
def url_for( text, font, color, back_color, size_fixed = False, align = 'center', stretch = True ): base_url = app.config['SITE_BASE_URL'] payload = { 'text': text, 'font': font, 'color': color, 'back_color': back_color, 'size_fixed': str(size_fixed).lower(), 'align': align, 'stretch': str(stretch).lower() } return urljoin(base_url, 'emoji') + '?' + urlencode(payload)
def respond_to_checkpoint(self, response_code): headers = { 'User-Agent': self.USER_AGENT, 'Origin': 'https://i.instagram.com', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US', 'Accept-Encoding': 'gzip', 'Referer': self.endpoint, 'Cookie': self.cookie, } req = Request(self.endpoint, headers=headers) data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code} res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout) if res.info().get('Content-Encoding') == 'gzip': buf = BytesIO(res.read()) content = gzip.GzipFile(fileobj=buf).read().decode('utf-8') else: content = res.read().decode('utf-8') return res.code, content
def generate_request_url(self, scopes: tuple): """Generate OAuth request url. :param scopes: github access scopes (https://developer.github.com/v3/oauth/#scopes) :param callback: callback to be called when user authorize request (may be coroutine) """ state = base64.b64encode(os.urandom(15)).decode("ascii") self._requested_scopes[state] = scopes qs = parse.urlencode({ "client_id": self._client_id, "scope": ",".join(scopes), "state": state, }) return "%s?%s" % (REQ_ACCESS_URL, qs)
def test_05multiple_request(self): response = self.fetch('/cluster?{}'.format( parse.urlencode({'method': 'node_config', 'cluster': 'my cluster', 'node': server1})), headers={'Key': 'new key'}) self.assertEqual(response.code, 200) commands = json.loads(response.body.decode('utf-8')) self.assertEqual(len(commands), 1) self.assertEqual(commands[0][:30], 'python3 valuation_standalone_m') response = self.fetch('/cluster?{}'.format( parse.urlencode({'method': 'node_config', 'cluster': 'my cluster', 'node': server2})), headers={'Key': 'new key'}) self.assertEqual(response.code, 200) commands = json.loads(response.body.decode('utf-8')) print(response) self.assertEqual(commands[0][:27], 'python3 valuation_worker.py') self.assertEqual(commands[1][:27], 'python3 valuation_worker.py')
def cluster_list(self): """ Get the list of all the clusters created by this user :return: A dictionary with all the clusters """ arguments = { 'method': 'clusters_list' } client = HTTPClient() response = client.fetch('{}/cluster?{}'.format( self.uri, parse.urlencode(arguments)), headers={'Key': self.uk} ) if response.code == 200: return json.loads(response.body.decode('utf-8')) else: raise ValueError(response.body.decode('utf-8'))
def request(self, cluster_key, configuration): """ As an available resource, pass the configuration to the registry and get the commands that have to be run. :param cluster_key: Key of the cluster the resource wants to connect to :param configuration: String with the configuration of the resource :return: List with the commands that have to be run. """ arguments = { 'method': 'node_config', 'cluster': cluster_key, 'node': configuration } client = HTTPClient() response = client.fetch('{}/cluster?{}'.format( self.uri, parse.urlencode(arguments)), headers={'Key': self.uk} ) if response.code == 200: return json.loads(response.body.decode('utf-8')) else: raise ValueError(response.body.decode('utf-8'))
def cluster_status(self, cluster_key): """ Get the present status of the cluster :param cluster_key: Key that identifies the cluster :return: The status of the cluster """ arguments = { 'method': 'cluster_status', 'cluster': cluster_key } client = HTTPClient() response = client.fetch('{}/cluster?{}'.format( self.uri, parse.urlencode(arguments)), headers={'Key': self.uk} ) if response.code == 200: if response.body: return json.loads(response.body.decode('utf-8')) else: return "Empty" else: raise ValueError(response.body.decode('utf-8'))
def cluster_reset(self, cluster_key): """ Reset the status of a cluster. This means that all the temporary information about which resource request is forgotten. This may leave configured resources not properly configured, so handle with care. :param cluster_key: :return: Key of the cluster being reset """ arguments = { 'method': 'cluster_reset', 'cluster': cluster_key } client = HTTPClient() response = client.fetch('{}/cluster?{}'.format( self.uri, parse.urlencode(arguments)), headers={'Key': self.uk} ) if response.code == 200: return response.body.decode('utf-8') else: raise ValueError(response.body.decode('utf-8'))
def request(uri, cluster_key, configuration): """ As an available resource, pass the configuration to the registry and get the commands that have to be run. :param uri: Address of the Registry :param cluster_key: Key of the cluster the resource wants to connect to :param configuration: String with the configuration of the resource :return: List with the commands that have to be run. """ arguments = { 'method': 'node_config', 'cluster': cluster_key, 'node': configuration } client = HTTPClient() response = client.fetch('{}/cluster?{}'.format( uri, parse.urlencode(arguments)) ) if response.code == 200: return json.loads(response.body.decode('utf-8')) else: raise ValueError(response.body.decode('utf-8'))
def download(self, fr=None, to=None): """ Download the log lines, that you may filter by time :param fr: datetime. Log lines from :param to: datetime. Log lines to :return: A list with dicts """ arguments = { 'cluster': self.cluster } if fr: arguments['fr'] = fr if to: arguments['to'] = to client = HTTPClient() response = client.fetch('{}/logs?{}'.format( self.uri, parse.urlencode(arguments)), ) return json.loads(response.body.decode('utf-8'))
def unicode_urlencode(query, doseq=True): """ Custom wrapper around urlencode to support unicode Python urlencode doesn't handle unicode well so we need to convert to bytestrings before using it: http://stackoverflow.com/questions/6480723/urllib-urlencode-doesnt-like-unicode-values-how-about-this-workaround """ pairs = [] for key, value in query.items(): if isinstance(value, list): value = list(map(to_utf8, value)) else: value = to_utf8(value) pairs.append((to_utf8(key), value)) encoded_query = dict(pairs) return urlencode(encoded_query, doseq)
def __init__(self, credentials, host, request_uri, headers, response, content, http): from urllib.parse import urlencode Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http) challenge = _parse_www_authenticate(response, 'www-authenticate') service = challenge['googlelogin'].get('service', 'xapi') # Bloggger actually returns the service in the challenge # For the rest we guess based on the URI if service == 'xapi' and request_uri.find("calendar") > 0: service = "cl" # No point in guessing Base or Spreadsheet #elif request_uri.find("spreadsheets") > 0: # service = "wise" auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent']) resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'}) lines = content.split('\n') d = dict([tuple(line.split("=", 1)) for line in lines if line]) if resp.status == 403: self.Auth = "" else: self.Auth = d['Auth']
def test_auth_url(self): perms = ['email', 'birthday'] redirect_url = 'https://localhost/facebook/callback/' expected_url = 'https://www.facebook.com/dialog/oauth?' + urlencode( dict(client_id=self.app_id, redirect_uri=redirect_url, scope=','.join(perms))) actual_url = facebook.auth_url(self.app_id, redirect_url, perms=perms) # Since the order of the query string parameters might be # different in each URL, we cannot just compare them to each # other. expected_url_result = urlparse(expected_url) actual_url_result = urlparse(actual_url) expected_query = parse_qs(expected_url_result.query) actual_query = parse_qs(actual_url_result.query) self.assertEqual(actual_url_result.scheme, expected_url_result.scheme) self.assertEqual(actual_url_result.netloc, expected_url_result.netloc) self.assertEqual(actual_url_result.path, expected_url_result.path) self.assertEqual(actual_url_result.params, expected_url_result.params) self.assertEqual(actual_query, expected_query)
def _getpage(self, page): data = { 'mid': str(self._mid), 'pagesize': '30', 'tid': '0', 'page': str(page), 'keyword': '', 'order': 'senddate', '_': '1496812411295' } # http://space.bilibili.com/ajax/member/getSubmitVideos?mid=15989779 # &pagesize=30&tid=0&page=1&keyword=&order=senddate&_=1496812411295 url = "http://space.bilibili.com/ajax/member/getSubmitVideos?" + urlencode(data) try: response = requests.get(url) if response.status_code != 200: return None html_cont = response.text return html_cont except RequestException: return None
def _tpl_send_sms(phone, tpl_id, tpl_value): """ ??????? tpl_value = {'#code#':'1234','#company#':'???'} """ if not isValidPhone(phone): return False if isTestPhone(phone): return False apikey = settings.YUNPIAN_API_KEY # get apikey by global settings params = {'apikey': apikey, 'tpl_id': tpl_id, 'tpl_value': urlencode(tpl_value), 'mobile': phone} url = "https://sms.yunpian.com/v1/sms/tpl_send.json" headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} response = requests.post(url, headers=headers, data=params) if response.status_code != 200: _logger.error('cannot reach sms server, http_status is %s' % (response.status_code)) raise SendSMSError("sms server status code is {status_code}".format(status_code=response.status_code)) else: content = response.content.decode() data = json.loads(content) if data["code"] != 0: _logger.error('sms server response error, CODE: %s, MSG: %s(%s)' % (data.get('code'), data.get('msg'), data.get('detail'))) raise SendSMSError("sms server error. {error_msg}".format(error_msg=content)) return response
def sendRequest(host, port, path, headers, params, reqType="GET"): params = urlencode(params) path = path + "?"+ params if reqType == "GET" and params else path if len(headers): logger.debug(headers) if len(params): logger.debug(params) logger.debug("Opening connection to %s" % host); conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port) logger.debug("Sending %s request to %s" % (reqType, path)) conn.request(reqType, path, params, headers); response = conn.getresponse() return response
def _atlassian_jwt_post_token(self): if not getattr(g, 'ac_client', None): return dict() args = request.args.copy() try: del args['jwt'] except KeyError: pass signature = encode_token( 'POST', request.path + '?' + urlencode(args), g.ac_client.clientKey, g.ac_client.sharedSecret) args['jwt'] = signature return dict(atlassian_jwt_post_url=request.path + '?' + urlencode(args))
def create_new_paste(contents): """ Creates a new paste using bpaste.net service. :contents: paste contents as utf-8 encoded bytes :returns: url to the pasted contents """ params = { 'code': contents, 'lexer': 'python3' if sys.version_info[0] == 3 else 'python', 'expiry': '1week', } url = 'https://bpaste.net' response = urlopen(url, data=urlencode(params).encode('ascii')).read() m = re.search(r'href="/raw/(\w+)"', response.decode('utf-8')) if m: return '%s/show/%s' % (url, m.group(1)) else: return 'bad response: ' + response