我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse()。
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs): try: params = kwargs['params'] except KeyError: params = {} if method_verb != 'POST': endpoint_path += urllib.parse.urlencode(params) msg = {'path': endpoint_path, 'nonce': self.nonce(), 'token_id': self.key} signature = jwt.encode(msg, self.secret, algorithm='HS256') headers = {'X-Quoine-API-Version': '2', 'X-Quoine-Auth': signature, 'Content-Type': 'application/json'} request = {'headers': headers} if method_verb == 'POST': request['json'] = params return self.uri + endpoint_path, request
def httpPost(url,resource,params): headers = { "Content-type" : "application/x-www-form-urlencoded", } try : conn = httplib.HTTPSConnection(url, timeout=10) temp_params = urllib.parse.urlencode(params) conn.request("POST", resource, temp_params, headers) response = conn.getresponse() data = response.read().decode('utf-8') params.clear() conn.close() return data except: # except Exception,e: # print(Exception,":",e) traceback.print_exc() return False
def doJenkinsSetUrl(recipes, argv): parser = argparse.ArgumentParser(prog="bob jenkins set-url") parser.add_argument("name", help="Jenkins server alias") parser.add_argument("url", help="New URL") args = parser.parse_args(argv) if args.name not in BobState().getAllJenkins(): print("Jenkins '{}' not known.".format(args.name), file=sys.stderr) sys.exit(1) url = urllib.parse.urlparse(args.url) urlPath = url.path if not urlPath.endswith("/"): urlPath = urlPath + "/" config = BobState().getJenkinsConfig(args.name) config["url"] = { "scheme" : url.scheme, "server" : url.hostname, "port" : url.port, "path" : urlPath, "username" : url.username, "password" : url.password, } BobState().setJenkinsConfig(args.name, config)
def dict_search_args_parse(self, message): if not message: await self.bot.say("Error in arg parse") return limit = 1 query = message result = re.match(r"^([0-9]+)\s+(.*)$", message) if result: limit, query = [result.group(x) for x in (1, 2)] return int(limit), query # keys = ["limit"] # kwargs = utils.get_kwargs(args, keys) # try: # limit = int(kwargs["limit"]) # if limit <= 0: # raise ValueError # except (ValueError, KeyError): # limit = 1 # query = utils.strip_kwargs(args, keys)
def dumb_css_parser(data): """returns a hash of css selectors, each of which contains a hash of css attributes""" # remove @import sentences data += ';' importIndex = data.find('@import') while importIndex != -1: data = data[0:importIndex] + data[data.find(';', importIndex) + 1:] importIndex = data.find('@import') # parse the css. reverted from dictionary compehension in order to support older pythons elements = [x.split('{') for x in data.split('}') if '{' in x.strip()] try: elements = dict([(a.strip(), dumb_property_dict(b)) for a, b in elements]) except ValueError: elements = {} # not that important return elements
def _call_ACIS(self, kwargs, **moreKwargs): ''' Core method for calling the ACIS services. Returns python dictionary by de-serializing json response ''' #self._formatInputDict(**kwargs) kwargs.update(moreKwargs) self._input_dict = self._stripNoneValues(kwargs) self.url = self.baseURL + self.webServiceSource if pyVersion == 2: #python 2.x params = urllib.urlencode({'params':json.dumps(self._input_dict)}) request = urllib2.Request(self.url, params, {'Accept':'application/json'}) response = urllib2.urlopen(request) jsonData = response.read() elif pyVersion == 3: #python 3.x params = urllib.parse.urlencode({'params':json.dumps(self._input_dict)}) params = params.encode('utf-8') req = urllib.request.urlopen(self.url, data = params) jsonData = req.read().decode() return json.loads(jsonData)
def _fetch_img_urls(self, keyword, safe_search=False): # bing img search, https://gist.github.com/stephenhouser/c5e2b921c3770ed47eb3b75efbc94799 url = self._get_bing_url(keyword, safe_search=safe_search) self.logger.debug('search url {}'.format(url)) header = { 'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/43.0.2357.134 Safari/537.36"} soup = BeautifulSoup(urllib.request.urlopen(urllib.request.Request(url, headers=header)), 'html.parser') imgs = [] # contains the link for Large original images, type of image for a in soup.find_all("a", {"class": "iusc"}): mad = json.loads(a["mad"]) turl = mad["turl"] m = json.loads(a["m"]) murl = m["murl"] image_name = urllib.parse.urlsplit(murl).path.split("/")[-1] imgs.append((image_name, turl, murl)) return imgs
def __init__(self, var): #: The original string that comes through with the variable self.original = var #: The operator for the variable self.operator = '' #: List of safe characters when quoting the string self.safe = '' #: List of variables in this variable self.variables = [] #: List of variable names self.variable_names = [] #: List of defaults passed in self.defaults = {} # Parse the variable itself. self.parse() self.post_parse()
def __init__(self, credentials, host, request_uri, headers, response, content, http): from urllib.parse import urlencode Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http) challenge = _parse_www_authenticate(response, 'www-authenticate') service = challenge['googlelogin'].get('service', 'xapi') # Bloggger actually returns the service in the challenge # For the rest we guess based on the URI if service == 'xapi' and request_uri.find("calendar") > 0: service = "cl" # No point in guessing Base or Spreadsheet #elif request_uri.find("spreadsheets") > 0: # service = "wise" auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent']) resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'}) lines = content.split('\n') d = dict([tuple(line.split("=", 1)) for line in lines if line]) if resp.status == 403: self.Auth = "" else: self.Auth = d['Auth']
def testGet301(self): # Test that we automatically follow 301 redirects # and that we cache the 301 response uri = urllib.parse.urljoin(base, "301/onestep.asis") destination = urllib.parse.urljoin(base, "302/final-destination.txt") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertTrue('content-location' in response) self.assertEqual(response['content-location'], destination) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 301) self.assertEqual(response.previous.fromcache, False) (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertEqual(response['content-location'], destination) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 301) self.assertEqual(response.previous.fromcache, True)
def testGet302RedirectionLimit(self): # Test that we can set a lower redirection limit # and that we raise an exception when we exceed # that limit. self.http.force_exception_to_status_code = False uri = urllib.parse.urljoin(base, "302/twostep.asis") try: (response, content) = self.http.request(uri, "GET", redirections = 1) self.fail("This should not happen") except httplib2.RedirectLimit: pass except Exception as e: self.fail("Threw wrong kind of exception ") # Re-run the test with out the exceptions self.http.force_exception_to_status_code = True (response, content) = self.http.request(uri, "GET", redirections = 1) self.assertEqual(response.status, 500) self.assertTrue(response.reason.startswith("Redirected more")) self.assertEqual("302", response['status']) self.assertTrue(content.startswith(b"<html>")) self.assertTrue(response.previous != None)
def testGet302NoLocation(self): # Test that we throw an exception when we get # a 302 with no Location: header. self.http.force_exception_to_status_code = False uri = urllib.parse.urljoin(base, "302/no-location.asis") try: (response, content) = self.http.request(uri, "GET") self.fail("Should never reach here") except httplib2.RedirectMissingLocation: pass except Exception as e: self.fail("Threw wrong kind of exception ") # Re-run the test with out the exceptions self.http.force_exception_to_status_code = True (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 500) self.assertTrue(response.reason.startswith("Redirected but")) self.assertEqual("302", response['status']) self.assertTrue(content.startswith(b"This is content"))
def testGet304(self): # Test that we use ETags properly to validate our cache uri = urllib.parse.urljoin(base, "304/test_etag.txt") (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'}) self.assertNotEqual(response['etag'], "") (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'}) (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'must-revalidate'}) self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, True) cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1])) f = open(cache_file_name, "r") status_line = f.readline() f.close() self.assertTrue(status_line.startswith("status:")) (response, content) = self.http.request(uri, "HEAD", headers = {'accept-encoding': 'identity'}) self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, True) (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'range': 'bytes=0-0'}) self.assertEqual(response.status, 206) self.assertEqual(response.fromcache, False)
def testGet307(self): # Test that we do follow 307 redirects but # do not cache the 307 uri = urllib.parse.urljoin(base, "307/onestep.asis") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 307) self.assertEqual(response.previous.fromcache, False) (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, True) self.assertEqual(content, b"This is the final destination.\n") self.assertEqual(response.previous.status, 307) self.assertEqual(response.previous.fromcache, False)
def testNoVary(self): pass # when there is no vary, a different Accept header (e.g.) should not # impact if the cache is used # test that the vary header is not sent # uri = urllib.parse.urljoin(base, "vary/no-vary.asis") # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) # self.assertEqual(response.status, 200) # self.assertFalse('vary' in response) # # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) # self.assertEqual(response.status, 200) # self.assertEqual(response.fromcache, True, msg="Should be from cache") # # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'}) # self.assertEqual(response.status, 200) # self.assertEqual(response.fromcache, True, msg="Should be from cache")
def testVaryHeaderDouble(self): uri = urllib.parse.urljoin(base, "vary/accept-double.asis") (response, content) = self.http.request(uri, "GET", headers={ 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'}) self.assertEqual(response.status, 200) self.assertTrue('vary' in response) # we are from cache (response, content) = self.http.request(uri, "GET", headers={ 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'}) self.assertEqual(response.fromcache, True, msg="Should be from cache") (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, False) # get the resource again, not from cache, varied headers don't match exact (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'}) self.assertEqual(response.status, 200) self.assertEqual(response.fromcache, False, msg="Should not be from cache")
def testGetGZipFailure(self): # Test that we raise a good exception when the gzip fails self.http.force_exception_to_status_code = False uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis") try: (response, content) = self.http.request(uri, "GET") self.fail("Should never reach here") except httplib2.FailedToDecompressContent: pass except Exception: self.fail("Threw wrong kind of exception") # Re-run the test with out the exceptions self.http.force_exception_to_status_code = True (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 500) self.assertTrue(response.reason.startswith("Content purported"))
def testGetDeflateFailure(self): # Test that we raise a good exception when the deflate fails self.http.force_exception_to_status_code = False uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis") try: (response, content) = self.http.request(uri, "GET") self.fail("Should never reach here") except httplib2.FailedToDecompressContent: pass except Exception: self.fail("Threw wrong kind of exception") # Re-run the test with out the exceptions self.http.force_exception_to_status_code = True (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 500) self.assertTrue(response.reason.startswith("Content purported"))
def testBasicAuth(self): # Test Basic Authentication uri = urllib.parse.urljoin(base, "basic/file.txt") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 401) uri = urllib.parse.urljoin(base, "basic/") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 401) self.http.add_credentials('joe', 'password') (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) uri = urllib.parse.urljoin(base, "basic/file.txt") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200)
def testBasicAuthTwoDifferentCredentials(self): # Test Basic Authentication with multiple sets of credentials uri = urllib.parse.urljoin(base, "basic2/file.txt") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 401) uri = urllib.parse.urljoin(base, "basic2/") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 401) self.http.add_credentials('fred', 'barney') (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200) uri = urllib.parse.urljoin(base, "basic2/file.txt") (response, content) = self.http.request(uri, "GET") self.assertEqual(response.status, 200)
def testDigestAuthStale(self): # Test that we can handle a nonce becoming stale uri = urllib.parse.urljoin(base, "digest-expire/file.txt") self.http.add_credentials('joe', 'password') (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"}) info = httplib2._parse_www_authenticate(response, 'authentication-info') self.assertEqual(response.status, 200) time.sleep(3) # Sleep long enough that the nonce becomes stale (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"}) self.assertFalse(response.fromcache) self.assertTrue(response._stale_digest) info3 = httplib2._parse_www_authenticate(response, 'authentication-info') self.assertEqual(response.status, 200)
def parse_songci(self, response): item = SongCiItem() item['url'] = response.url full_title = response.css('div.son1>h1::text').extract_first() if full_title: try: item['tune_name'], item['title'] = full_title.split('·') except ValueError: item['title'] = full_title son2_p = response.css('div.son2>p') for p in son2_p: for name, field in {'??': 'dynasty', '??': 'author'}.items(): if name in p.css('::text').extract_first(): item[field] = p.css('::text').extract()[1] content = ''.join(response.css('div#cont::text').extract()).strip() if content: item['content'] = content else: all_p_texts = son2_p.css('::text').extract() try: item['content'] = '\n'.join(all_p_texts[all_p_texts.index('???') + 1:]).strip() except ValueError: self.logger.error('Cannot parse item. url=%s', response.url) yield item
def sendGetRequest(self, parser = None): self.response = None params = self.params#[param.items()[0] for param in self.params]; parser = parser or self.parser or ResponseParser() headers = dict(list({"User-Agent":self.getUserAgent(), "Accept": parser.getMeta() }.items()) + list(self.headers.items())); host,port,path = self.getConnectionParameters() self.response = WARequest.sendRequest(host, port, path, headers, params, "GET") if not self.response.status == WARequest.OK: logger.error("Request not success, status was %s"%self.response.status) return {} data = self.response.read() logger.info(data) self.sent = True return parser.parse(data.decode(), self.pvars)
def sendPostRequest(self, parser = None): self.response = None params = self.params #[param.items()[0] for param in self.params]; parser = parser or self.parser or ResponseParser() headers = dict(list({"User-Agent":self.getUserAgent(), "Accept": parser.getMeta(), "Content-Type":"application/x-www-form-urlencoded" }.items()) + list(self.headers.items())) host,port,path = self.getConnectionParameters() self.response = WARequest.sendRequest(host, port, path, headers, params, "POST") if not self.response.status == WARequest.OK: logger.error("Request not success, status was %s" % self.response.status) return {} data = self.response.read() logger.info(data) self.sent = True return parser.parse(data.decode(), self.pvars)
def setUp(self): # the URLs for now which will have the WSDL files and the XSD file #urlparse.urljoin('file:', urllib.pathname2url(os.path.abspath("service.xml"))) import urllib import os from urllib.parse import urlparse from urllib.request import pathname2url query_services_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-query-1.7.wsdl'))) userservices_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-auth-1.7.wsdl'))) # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file query_services_client = Client(query_services_url, transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt')) user_services_client = Client(userservices_url, transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt')) self.test_user_services_object = SymantecUserServices(user_services_client)
def setUp(self): # the URLs for now which will have the WSDL files and the XSD file import urllib import os from urllib.parse import urlparse from urllib.request import pathname2url managementservices_url = urllib.parse.urljoin('file:', pathname2url( os.path.abspath('../wsdl_files/vipuserservices-mgmt-1.7.wsdl'))) # managementservices_url = 'http://webdev.cse.msu.edu/~huynhall/vipuserservices-mgmt-1.7.wsdl' # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file self.management_client = Client(managementservices_url, transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt')) self.test_management_services_object = SymantecManagementServices(self.management_client) pass
def __creatBlocks(self): """ Second part of parsing. Find blocks and creat a list. """ w = list(zip(self.lines, self.indentationList)) self.blocks, indentation, level = "[", 0, 0 for i in w: if i[1] > indentation: level = level + 1 self.blocks += ",[" + '"' + urllib.parse.quote_plus(i[0]) + '"' elif i[1] == 0: if len(self.blocks) > 1: self.blocks += "]" * (level) + ',' self.blocks += '"' + urllib.parse.quote_plus(i[0]) + '"' level = 0 elif i[1] < indentation: if w.index(i) != len(w): self.blocks += "]" + "," + '"' + \ urllib.parse.quote_plus(i[0]) + '"' level += -1 elif i[1] == indentation: self.blocks += "," + '"' + urllib.parse.quote_plus(i[0]) + '"' indentation = i[1] self.blocks += "]" * (level + 1) self.blocks = ast.literal_eval(self.blocks)
def _makeSafeAbsoluteURI(base, rel=None): # bail if ACCEPTABLE_URI_SCHEMES is empty if not ACCEPTABLE_URI_SCHEMES: return _urljoin(base, rel or '') if not base: return rel or '' if not rel: try: scheme = urllib.parse.urlparse(base)[0] except ValueError: return '' if not scheme or scheme in ACCEPTABLE_URI_SCHEMES: return base return '' uri = _urljoin(base, rel) if uri.strip().split(':', 1)[0] not in ACCEPTABLE_URI_SCHEMES: return '' return uri
def http_error_401(self, req, fp, code, msg, headers): # Check if # - server requires digest auth, AND # - we tried (unsuccessfully) with basic auth, AND # If all conditions hold, parse authentication information # out of the Authorization header we sent the first time # (for the username and password) and the WWW-Authenticate # header the server sent back (for the realm) and retry # the request with the appropriate digest auth headers instead. # This evil genius hack has been brought to you by Aaron Swartz. host = urllib.parse.urlparse(req.get_full_url())[1] if base64 is None or 'Authorization' not in req.headers \ or 'WWW-Authenticate' not in headers: return self.http_error_default(req, fp, code, msg, headers) auth = _base64decode(req.headers['Authorization'].split(' ')[1]) user, passw = auth.split(':') realm = re.findall('realm="([^"]*)"', headers['WWW-Authenticate'])[0] self.add_password(realm, host, user, passw) retry = self.http_error_auth_reqed('www-authenticate', host, req, headers) self.reset_retry_count() return retry
def _convert_to_idn(url): """Convert a URL to IDN notation""" # this function should only be called with a unicode string # strategy: if the host cannot be encoded in ascii, then # it'll be necessary to encode it in idn form parts = list(urllib.parse.urlsplit(url)) try: parts[1].encode('ascii') except UnicodeEncodeError: # the url needs to be converted to idn notation host = parts[1].rsplit(':', 1) newhost = [] port = '' if len(host) == 2: port = host.pop() for h in host[0].split('.'): newhost.append(h.encode('idna').decode('utf-8')) parts[1] = '.'.join(newhost) if port: parts[1] += ':' + port return urllib.parse.urlunsplit(parts) else: return url
def __deserialize_date(self, string): """ Deserializes string to date. :param string: str. :return: date. """ try: from dateutil.parser import parse return parse(string).date() except ImportError: return string except ValueError: raise ApiException( status=0, reason="Failed to parse `{0}` into a date object" .format(string) )
def __deserialize_datatime(self, string): """ Deserializes string to datetime. The string should be in iso8601 datetime format. :param string: str. :return: datetime. """ try: from dateutil.parser import parse return parse(string) except ImportError: return string except ValueError: raise ApiException( status=0, reason="Failed to parse `{0}` into a datetime object". format(string) )
def resolve_reference(cls, design_ref): """Resolve a reference to a design document. Locate a schema handler based on the URI scheme of the data reference and use that handler to get the data referenced. :param design_ref: A URI-formatted reference to a data entity """ try: design_uri = urllib.parse.urlparse(design_ref) handler = cls.scheme_handlers.get(design_uri.scheme, None) if handler is None: raise errors.InvalidDesignReference( "Invalid reference scheme %s: no handler." % design_uri.scheme) else: # Have to do a little magic to call the classmethod as a pointer return handler.__get__(None, cls)(design_uri) except ValueError: raise errors.InvalidDesignReference( "Cannot resolve design reference %s: unable to parse as valid URI." % design_ref)
def resolve_reference_http(cls, design_uri): """Retrieve design documents from http/https endpoints. Return a byte array of the response content. Support unsecured or basic auth :param design_uri: Tuple as returned by urllib.parse for the design reference """ if design_uri.username is not None and design_uri.password is not None: response = requests.get( design_uri.geturl(), auth=(design_uri.username, design_uri.password), timeout=30) else: response = requests.get(design_uri.geturl(), timeout=30) return response.content
def resolve_reference_ucp(cls, design_uri): """Retrieve artifacts from a UCP service endpoint. Return a byte array of the response content. Assumes Keystone authentication required. :param design_uri: Tuple as returned by urllib.parse for the design reference """ ks_sess = KeystoneUtils.get_session() (new_scheme, foo) = re.subn('^[^+]+\+', '', design_uri.scheme) url = urllib.parse.urlunparse((new_scheme, design_uri.netloc, design_uri.path, design_uri.params, design_uri.query, design_uri.fragment)) logger = logging.getLogger(__name__) logger.debug("Calling Keystone session for url %s" % str(url)) resp = ks_sess.get(url) if resp.status_code >= 400: raise errors.InvalidDesignReference( "Received error code for reference %s: %s - %s" % (url, str(resp.status_code), resp.text)) return resp.content
def dl_json(url, header={}): ''' http GET one file, and parse json text. return json info result ''' # NOTE json must use 'utf-8' encoding blob = dl_blob(url, header=header) try: text = blob.decode('utf-8') except Exception as e: er = err.DecodingError('decode blob to json text with \"utf-8\" failed, URL ', url) er.blob = blob raise er from e try: info = json.loads(text) return info except Exception as e: er = err.ParseJSONError('parse json text failed, URL ', url) er.text = text raise er from e # TODO now only support 'utf-8' encoding
def sign(self, url, endpoint, endpoint_path, method_verb, *args, **kwargs): try: req = kwargs['params'] except KeyError: req = {} req['nonce'] = self.nonce() postdata = urllib.parse.urlencode(req) # Unicode-objects must be encoded before hashing encoded = (str(req['nonce']) + postdata).encode('utf-8') message = (endpoint_path.encode('utf-8') + hashlib.sha256(encoded).digest()) signature = hmac.new(base64.b64decode(self.secret), message, hashlib.sha512) sigdigest = base64.b64encode(signature.digest()) headers = { 'API-Key': self.key, 'API-Sign': sigdigest.decode('utf-8') } return url, {'data': req, 'headers': headers}
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs): nonce = self.nonce() try: params = kwargs['params'] except KeyError: params = {} params['apikey'] = self.key params['nonce'] = nonce post_params = params post_params.update({'nonce': nonce, 'method': endpoint}) post_params = urllib.parse.urlencode(post_params) url = uri + post_params sig = hmac.new(url, self.secret, hashlib.sha512) headers = {'apisign': sig} return url, {'headers': headers}
def sign(self, url, endpoint, endpoint_path, method_verb, *args, **kwargs): try: params = kwargs['params'] except KeyError: params = {} nonce = self.nonce() req_string = endpoint_path + '?apikey=' + self.key + "&nonce=" + nonce + '&' req_string += urllib.parse.urlencode(params) headers = {"apisign": hmac.new(self.secret.encode('utf-8'), (self.uri + req_string).encode('utf-8'), hashlib.sha512).hexdigest()} return self.uri + req_string, {'headers': headers, 'params': {}}
def get(url,headers={},params=None): if sys.version_info[0] == 3: if params: data = parse.urlencode(params) url = "%s?%s" % (url, data) req = request.Request(url=url,headers=headers) else: req = request.Request(url=url,headers=headers) response = request.urlopen(req) return Response(response) elif sys.version_info[0] == 2: if params: data = urllib.urlencode(params) url = url + '?' + data req = urllib2.Request(url=url,headers=headers) else: req = urllib2.Request(url=url,headers=headers) response = urllib2.urlopen(req) return Response(response) else: return None
def parse(self, response, crawler): document = lxml.etree.HTML(response.text) for title in document.cssselect('tr.athing a.storylink'): yield title.text urlinfo = urllib.parse.urlparse(response.url) base_url = urlinfo.scheme + '://' + urlinfo.netloc try: href = document.cssselect('a.morelink')[0].get('href') except: return next_url = urllib.parse.urljoin(base_url, href) crawler.schedule_request(next_url)
def __init__(self, url=None, *args, **kwargs): super(Bazaar, self).__init__(url, *args, **kwargs) # Python >= 2.7.4, 3.3 doesn't have uses_fragment or non_hierarchical # Register lp but do not expose as a scheme to support bzr+lp. if getattr(urllib_parse, 'uses_fragment', None): urllib_parse.uses_fragment.extend(['lp']) urllib_parse.non_hierarchical.extend(['lp'])