Python urllib 模块,parse() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse()

项目:bitex    作者:nlsdfnbch    | 项目源码 | 文件源码
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs):
        try:
            params = kwargs['params']
        except KeyError:
            params = {}

        if method_verb != 'POST':
            endpoint_path += urllib.parse.urlencode(params)
        msg = {'path': endpoint_path, 'nonce': self.nonce(), 'token_id': self.key}

        signature = jwt.encode(msg, self.secret, algorithm='HS256')
        headers = {'X-Quoine-API-Version': '2', 'X-Quoine-Auth': signature,
                   'Content-Type': 'application/json'}
        request = {'headers': headers}
        if method_verb == 'POST':
            request['json'] = params
        return self.uri + endpoint_path, request
项目:bitcoin-arbitrage    作者:ucfyao    | 项目源码 | 文件源码
def httpPost(url,resource,params):
    headers = {
        "Content-type" : "application/x-www-form-urlencoded",
    }
    try :
        conn = httplib.HTTPSConnection(url, timeout=10)
        temp_params = urllib.parse.urlencode(params)
        conn.request("POST", resource, temp_params, headers)
        response = conn.getresponse()
        data = response.read().decode('utf-8')
        params.clear()
        conn.close()
        return data
    except:
    # except Exception,e:  
        # print(Exception,":",e)
        traceback.print_exc()
        return False
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def doJenkinsSetUrl(recipes, argv):
    parser = argparse.ArgumentParser(prog="bob jenkins set-url")
    parser.add_argument("name", help="Jenkins server alias")
    parser.add_argument("url", help="New URL")
    args = parser.parse_args(argv)

    if args.name not in BobState().getAllJenkins():
        print("Jenkins '{}' not known.".format(args.name), file=sys.stderr)
        sys.exit(1)

    url = urllib.parse.urlparse(args.url)
    urlPath = url.path
    if not urlPath.endswith("/"): urlPath = urlPath + "/"

    config = BobState().getJenkinsConfig(args.name)
    config["url"] = {
        "scheme" : url.scheme,
        "server" : url.hostname,
        "port" : url.port,
        "path" : urlPath,
        "username" : url.username,
        "password" : url.password,
    }
    BobState().setJenkinsConfig(args.name, config)
项目:Jumper-Cogs    作者:Redjumpman    | 项目源码 | 文件源码
def dict_search_args_parse(self, message):
        if not message:
            await self.bot.say("Error in arg parse")
            return
        limit = 1
        query = message
        result = re.match(r"^([0-9]+)\s+(.*)$", message)
        if result:
            limit, query = [result.group(x) for x in (1, 2)]
        return int(limit), query
        # keys = ["limit"]
        # kwargs = utils.get_kwargs(args, keys)
        # try:
        #     limit = int(kwargs["limit"])
        #     if limit <= 0:
        #         raise ValueError
        # except (ValueError, KeyError):
        #     limit = 1
        # query = utils.strip_kwargs(args, keys)
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def dumb_css_parser(data):
    """returns a hash of css selectors, each of which contains a hash of css attributes"""
    # remove @import sentences
    data += ';'
    importIndex = data.find('@import')
    while importIndex != -1:
        data = data[0:importIndex] + data[data.find(';', importIndex) + 1:]
        importIndex = data.find('@import')

    # parse the css. reverted from dictionary compehension in order to support older pythons
    elements =  [x.split('{') for x in data.split('}') if '{' in x.strip()]
    try:
        elements = dict([(a.strip(), dumb_property_dict(b)) for a, b in elements])
    except ValueError:
        elements = {} # not that important

    return elements
项目:IM_Climate    作者:IMDProjects    | 项目源码 | 文件源码
def _call_ACIS(self, kwargs, **moreKwargs):
        '''
        Core method for calling the ACIS services.

        Returns python dictionary by de-serializing json response
        '''
        #self._formatInputDict(**kwargs)
        kwargs.update(moreKwargs)
        self._input_dict = self._stripNoneValues(kwargs)
        self.url = self.baseURL + self.webServiceSource
        if pyVersion == 2:      #python 2.x
            params = urllib.urlencode({'params':json.dumps(self._input_dict)})
            request = urllib2.Request(self.url, params, {'Accept':'application/json'})
            response = urllib2.urlopen(request)
            jsonData = response.read()
        elif pyVersion == 3:    #python 3.x
            params = urllib.parse.urlencode({'params':json.dumps(self._input_dict)})
            params = params.encode('utf-8')
            req = urllib.request.urlopen(self.url, data = params)
            jsonData = req.read().decode()
        return json.loads(jsonData)
项目:postcards    作者:abertschi    | 项目源码 | 文件源码
def _fetch_img_urls(self, keyword, safe_search=False):
        # bing img search, https://gist.github.com/stephenhouser/c5e2b921c3770ed47eb3b75efbc94799

        url = self._get_bing_url(keyword, safe_search=safe_search)
        self.logger.debug('search url {}'.format(url))

        header = {
            'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/43.0.2357.134 Safari/537.36"}

        soup = BeautifulSoup(urllib.request.urlopen(urllib.request.Request(url, headers=header)), 'html.parser')
        imgs = []  # contains the link for Large original images, type of  image
        for a in soup.find_all("a", {"class": "iusc"}):
            mad = json.loads(a["mad"])
            turl = mad["turl"]
            m = json.loads(a["m"])
            murl = m["murl"]

            image_name = urllib.parse.urlsplit(murl).path.split("/")[-1]
            imgs.append((image_name, turl, murl))

        return imgs
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def __init__(self, var):
        #: The original string that comes through with the variable
        self.original = var
        #: The operator for the variable
        self.operator = ''
        #: List of safe characters when quoting the string
        self.safe = ''
        #: List of variables in this variable
        self.variables = []
        #: List of variable names
        self.variable_names = []
        #: List of defaults passed in
        self.defaults = {}
        # Parse the variable itself.
        self.parse()
        self.post_parse()
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def __init__(self, credentials, host, request_uri, headers, response, content, http):
        from urllib.parse import urlencode
        Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
        challenge = _parse_www_authenticate(response, 'www-authenticate')
        service = challenge['googlelogin'].get('service', 'xapi')
        # Bloggger actually returns the service in the challenge
        # For the rest we guess based on the URI
        if service == 'xapi' and  request_uri.find("calendar") > 0:
            service = "cl"
        # No point in guessing Base or Spreadsheet
        #elif request_uri.find("spreadsheets") > 0:
        #    service = "wise"

        auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
        resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'})
        lines = content.split('\n')
        d = dict([tuple(line.split("=", 1)) for line in lines if line])
        if resp.status == 403:
            self.Auth = ""
        else:
            self.Auth = d['Auth']
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testGet301(self):
        # Test that we automatically follow 301 redirects
        # and that we cache the 301 response
        uri = urllib.parse.urljoin(base, "301/onestep.asis")
        destination = urllib.parse.urljoin(base, "302/final-destination.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertTrue('content-location' in response)
        self.assertEqual(response['content-location'], destination)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 301)
        self.assertEqual(response.previous.fromcache, False)

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(response['content-location'], destination)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 301)
        self.assertEqual(response.previous.fromcache, True)
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testGet302RedirectionLimit(self):
        # Test that we can set a lower redirection limit
        # and that we raise an exception when we exceed
        # that limit.
        self.http.force_exception_to_status_code = False

        uri = urllib.parse.urljoin(base, "302/twostep.asis")
        try:
            (response, content) = self.http.request(uri, "GET", redirections = 1)
            self.fail("This should not happen")
        except httplib2.RedirectLimit:
            pass
        except Exception as e:
            self.fail("Threw wrong kind of exception ")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET", redirections = 1)
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Redirected more"))
        self.assertEqual("302", response['status'])
        self.assertTrue(content.startswith(b"<html>"))
        self.assertTrue(response.previous != None)
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testGet302NoLocation(self):
        # Test that we throw an exception when we get
        # a 302 with no Location: header.
        self.http.force_exception_to_status_code = False
        uri = urllib.parse.urljoin(base, "302/no-location.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.RedirectMissingLocation:
            pass
        except Exception as e:
            self.fail("Threw wrong kind of exception ")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Redirected but"))
        self.assertEqual("302", response['status'])
        self.assertTrue(content.startswith(b"This is content"))
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testGet304(self):
        # Test that we use ETags properly to validate our cache
        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
        (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
        self.assertNotEqual(response['etag'], "")

        (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
        (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'must-revalidate'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)

        cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
        f = open(cache_file_name, "r")
        status_line = f.readline()
        f.close()

        self.assertTrue(status_line.startswith("status:"))

        (response, content) = self.http.request(uri, "HEAD", headers = {'accept-encoding': 'identity'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)

        (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'range': 'bytes=0-0'})
        self.assertEqual(response.status, 206)
        self.assertEqual(response.fromcache, False)
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testGet307(self):
        # Test that we do follow 307 redirects but
        # do not cache the 307
        uri = urllib.parse.urljoin(base, "307/onestep.asis")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 307)
        self.assertEqual(response.previous.fromcache, False)

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 307)
        self.assertEqual(response.previous.fromcache, False)
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testNoVary(self):
        pass
        # when there is no vary, a different Accept header (e.g.) should not
        # impact if the cache is used
        # test that the vary header is not sent
        # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
        # self.assertEqual(response.status, 200)
        # self.assertFalse('vary' in response)
        #
        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
        # self.assertEqual(response.status, 200)
        # self.assertEqual(response.fromcache, True, msg="Should be from cache")
        #
        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
        # self.assertEqual(response.status, 200)
        # self.assertEqual(response.fromcache, True, msg="Should be from cache")
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testVaryHeaderDouble(self):
        uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
        (response, content) = self.http.request(uri, "GET", headers={
            'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
        self.assertEqual(response.status, 200)
        self.assertTrue('vary' in response)

        # we are from cache
        (response, content) = self.http.request(uri, "GET", headers={
            'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
        self.assertEqual(response.fromcache, True, msg="Should be from cache")

        (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, False)

        # get the resource again, not from cache, varied headers don't match exact
        (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testGetGZipFailure(self):
        # Test that we raise a good exception when the gzip fails
        self.http.force_exception_to_status_code = False
        uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.FailedToDecompressContent:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Content purported"))
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testGetDeflateFailure(self):
        # Test that we raise a good exception when the deflate fails
        self.http.force_exception_to_status_code = False

        uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.FailedToDecompressContent:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Content purported"))
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testBasicAuth(self):
        # Test Basic Authentication
        uri = urllib.parse.urljoin(base, "basic/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        uri = urllib.parse.urljoin(base, "basic/")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        self.http.add_credentials('joe', 'password')
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)

        uri = urllib.parse.urljoin(base, "basic/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testBasicAuthTwoDifferentCredentials(self):
        # Test Basic Authentication with multiple sets of credentials
        uri = urllib.parse.urljoin(base, "basic2/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        uri = urllib.parse.urljoin(base, "basic2/")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        self.http.add_credentials('fred', 'barney')
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)

        uri = urllib.parse.urljoin(base, "basic2/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def testDigestAuthStale(self):
        # Test that we can handle a nonce becoming stale
        uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
        self.http.add_credentials('joe', 'password')
        (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
        info = httplib2._parse_www_authenticate(response, 'authentication-info')
        self.assertEqual(response.status, 200)

        time.sleep(3)
        # Sleep long enough that the nonce becomes stale

        (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
        self.assertFalse(response.fromcache)
        self.assertTrue(response._stale_digest)
        info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
        self.assertEqual(response.status, 200)
项目:sc_spider    作者:wings27    | 项目源码 | 文件源码
def parse_songci(self, response):
        item = SongCiItem()
        item['url'] = response.url
        full_title = response.css('div.son1>h1::text').extract_first()
        if full_title:
            try:
                item['tune_name'], item['title'] = full_title.split('·')
            except ValueError:
                item['title'] = full_title

        son2_p = response.css('div.son2>p')
        for p in son2_p:
            for name, field in {'??': 'dynasty', '??': 'author'}.items():
                if name in p.css('::text').extract_first():
                    item[field] = p.css('::text').extract()[1]
        content = ''.join(response.css('div#cont::text').extract()).strip()
        if content:
            item['content'] = content
        else:
            all_p_texts = son2_p.css('::text').extract()
            try:
                item['content'] = '\n'.join(all_p_texts[all_p_texts.index('???') + 1:]).strip()
            except ValueError:
                self.logger.error('Cannot parse item. url=%s', response.url)
        yield item
项目:whatsapp-rest-webservice    作者:svub    | 项目源码 | 文件源码
def sendGetRequest(self, parser = None):
        self.response = None
        params =  self.params#[param.items()[0] for param in self.params];

        parser = parser or self.parser or ResponseParser()

        headers = dict(list({"User-Agent":self.getUserAgent(),
                "Accept": parser.getMeta()
            }.items()) + list(self.headers.items()));

        host,port,path = self.getConnectionParameters()

        self.response = WARequest.sendRequest(host, port, path, headers, params, "GET")

        if not self.response.status == WARequest.OK:
            logger.error("Request not success, status was %s"%self.response.status)
            return {}

        data = self.response.read()
        logger.info(data)

        self.sent = True
        return parser.parse(data.decode(), self.pvars)
项目:whatsapp-rest-webservice    作者:svub    | 项目源码 | 文件源码
def sendPostRequest(self, parser = None):
        self.response = None
        params =  self.params #[param.items()[0] for param in self.params];

        parser = parser or self.parser or ResponseParser()

        headers = dict(list({"User-Agent":self.getUserAgent(),
                "Accept": parser.getMeta(),
                "Content-Type":"application/x-www-form-urlencoded"
            }.items()) + list(self.headers.items()))

        host,port,path = self.getConnectionParameters()
        self.response = WARequest.sendRequest(host, port, path, headers, params, "POST")


        if not self.response.status == WARequest.OK:
            logger.error("Request not success, status was %s" % self.response.status)
            return {}

        data = self.response.read()

        logger.info(data)

        self.sent = True
        return parser.parse(data.decode(), self.pvars)
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def setUp(self):
        # the URLs for now which will have the WSDL files and the XSD file
        #urlparse.urljoin('file:', urllib.pathname2url(os.path.abspath("service.xml")))
        import urllib
        import os
        from urllib.parse import urlparse
        from urllib.request import pathname2url

        query_services_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-query-1.7.wsdl')))
        userservices_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-auth-1.7.wsdl')))

        # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
        query_services_client = Client(query_services_url,
                                       transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
        user_services_client = Client(userservices_url,
                                      transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))

        self.test_user_services_object = SymantecUserServices(user_services_client)
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def setUp(self):
        # the URLs for now which will have the WSDL files and the XSD file
        import urllib
        import os
        from urllib.parse import urlparse
        from urllib.request import pathname2url

        managementservices_url = urllib.parse.urljoin('file:', pathname2url(
            os.path.abspath('../wsdl_files/vipuserservices-mgmt-1.7.wsdl')))
        # managementservices_url = 'http://webdev.cse.msu.edu/~huynhall/vipuserservices-mgmt-1.7.wsdl'

        # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
        self.management_client = Client(managementservices_url,
                                        transport=HTTPSClientCertTransport('vip_certificate.crt',
                                                                           'vip_certificate.crt'))

        self.test_management_services_object = SymantecManagementServices(self.management_client)
        pass
项目:DASS    作者:JulesMichael    | 项目源码 | 文件源码
def __creatBlocks(self):
        """
        Second part of parsing. Find blocks and creat a list.
        """
        w = list(zip(self.lines, self.indentationList))
        self.blocks, indentation, level = "[", 0, 0
        for i in w:
            if i[1] > indentation:
                level = level + 1
                self.blocks += ",[" + '"' + urllib.parse.quote_plus(i[0]) + '"'
            elif i[1] == 0:
                if len(self.blocks) > 1:
                    self.blocks += "]" * (level) + ','
                self.blocks += '"' + urllib.parse.quote_plus(i[0]) + '"'
                level = 0
            elif i[1] < indentation:
                if w.index(i) != len(w):
                    self.blocks += "]" + "," + '"' + \
                        urllib.parse.quote_plus(i[0]) + '"'
                    level += -1
            elif i[1] == indentation:
                self.blocks += "," + '"' + urllib.parse.quote_plus(i[0]) + '"'
            indentation = i[1]
        self.blocks += "]" * (level + 1)
        self.blocks = ast.literal_eval(self.blocks)
项目:SublimeRSS    作者:JaredMHall    | 项目源码 | 文件源码
def _makeSafeAbsoluteURI(base, rel=None):
    # bail if ACCEPTABLE_URI_SCHEMES is empty
    if not ACCEPTABLE_URI_SCHEMES:
        return _urljoin(base, rel or '')
    if not base:
        return rel or ''
    if not rel:
        try:
            scheme = urllib.parse.urlparse(base)[0]
        except ValueError:
            return ''
        if not scheme or scheme in ACCEPTABLE_URI_SCHEMES:
            return base
        return ''
    uri = _urljoin(base, rel)
    if uri.strip().split(':', 1)[0] not in ACCEPTABLE_URI_SCHEMES:
        return ''
    return uri
项目:SublimeRSS    作者:JaredMHall    | 项目源码 | 文件源码
def http_error_401(self, req, fp, code, msg, headers):
        # Check if
        # - server requires digest auth, AND
        # - we tried (unsuccessfully) with basic auth, AND
        # If all conditions hold, parse authentication information
        # out of the Authorization header we sent the first time
        # (for the username and password) and the WWW-Authenticate
        # header the server sent back (for the realm) and retry
        # the request with the appropriate digest auth headers instead.
        # This evil genius hack has been brought to you by Aaron Swartz.
        host = urllib.parse.urlparse(req.get_full_url())[1]
        if base64 is None or 'Authorization' not in req.headers \
                          or 'WWW-Authenticate' not in headers:
            return self.http_error_default(req, fp, code, msg, headers)
        auth = _base64decode(req.headers['Authorization'].split(' ')[1])
        user, passw = auth.split(':')
        realm = re.findall('realm="([^"]*)"', headers['WWW-Authenticate'])[0]
        self.add_password(realm, host, user, passw)
        retry = self.http_error_auth_reqed('www-authenticate', host, req, headers)
        self.reset_retry_count()
        return retry
项目:SublimeRSS    作者:JaredMHall    | 项目源码 | 文件源码
def _convert_to_idn(url):
    """Convert a URL to IDN notation"""
    # this function should only be called with a unicode string
    # strategy: if the host cannot be encoded in ascii, then
    # it'll be necessary to encode it in idn form
    parts = list(urllib.parse.urlsplit(url))
    try:
        parts[1].encode('ascii')
    except UnicodeEncodeError:
        # the url needs to be converted to idn notation
        host = parts[1].rsplit(':', 1)
        newhost = []
        port = ''
        if len(host) == 2:
            port = host.pop()
        for h in host[0].split('.'):
            newhost.append(h.encode('idna').decode('utf-8'))
        parts[1] = '.'.join(newhost)
        if port:
            parts[1] += ':' + port
        return urllib.parse.urlunsplit(parts)
    else:
        return url
项目:isilon_sdk_7_2_python    作者:Isilon    | 项目源码 | 文件源码
def __deserialize_date(self, string):
        """
        Deserializes string to date.

        :param string: str.
        :return: date.
        """
        try:
            from dateutil.parser import parse
            return parse(string).date()
        except ImportError:
            return string
        except ValueError:
            raise ApiException(
                status=0,
                reason="Failed to parse `{0}` into a date object"
                .format(string)
            )
项目:isilon_sdk_7_2_python    作者:Isilon    | 项目源码 | 文件源码
def __deserialize_datatime(self, string):
        """
        Deserializes string to datetime.

        The string should be in iso8601 datetime format.

        :param string: str.
        :return: datetime.
        """
        try:
            from dateutil.parser import parse
            return parse(string)
        except ImportError:
            return string
        except ValueError:
            raise ApiException(
                status=0,
                reason="Failed to parse `{0}` into a datetime object".
                format(string)
            )
项目:connect-python-sdk    作者:square    | 项目源码 | 文件源码
def __deserialize_date(self, string):
        """
        Deserializes string to date.

        :param string: str.
        :return: date.
        """
        try:
            from dateutil.parser import parse
            return parse(string).date()
        except ImportError:
            return string
        except ValueError:
            raise ApiException(
                status=0,
                reason="Failed to parse `{0}` into a date object"
                .format(string)
            )
项目:connect-python-sdk    作者:square    | 项目源码 | 文件源码
def __deserialize_datatime(self, string):
        """
        Deserializes string to datetime.

        The string should be in iso8601 datetime format.

        :param string: str.
        :return: datetime.
        """
        try:
            from dateutil.parser import parse
            return parse(string)
        except ImportError:
            return string
        except ValueError:
            raise ApiException(
                status=0,
                reason="Failed to parse `{0}` into a datetime object".
                format(string)
            )
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def resolve_reference(cls, design_ref):
        """Resolve a reference to a design document.

        Locate a schema handler based on the URI scheme of the data reference
        and use that handler to get the data referenced.

        :param design_ref: A URI-formatted reference to a data entity
        """
        try:
            design_uri = urllib.parse.urlparse(design_ref)

            handler = cls.scheme_handlers.get(design_uri.scheme, None)

            if handler is None:
                raise errors.InvalidDesignReference(
                    "Invalid reference scheme %s: no handler." %
                    design_uri.scheme)
            else:
                # Have to do a little magic to call the classmethod as a pointer
                return handler.__get__(None, cls)(design_uri)
        except ValueError:
            raise errors.InvalidDesignReference(
                "Cannot resolve design reference %s: unable to parse as valid URI."
                % design_ref)
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def resolve_reference_http(cls, design_uri):
        """Retrieve design documents from http/https endpoints.

        Return a byte array of the response content. Support unsecured or
        basic auth

        :param design_uri: Tuple as returned by urllib.parse for the design reference
        """
        if design_uri.username is not None and design_uri.password is not None:
            response = requests.get(
                design_uri.geturl(),
                auth=(design_uri.username, design_uri.password),
                timeout=30)
        else:
            response = requests.get(design_uri.geturl(), timeout=30)

        return response.content
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def resolve_reference_ucp(cls, design_uri):
        """Retrieve artifacts from a UCP service endpoint.

        Return a byte array of the response content. Assumes Keystone
        authentication required.

        :param design_uri: Tuple as returned by urllib.parse for the design reference
        """
        ks_sess = KeystoneUtils.get_session()
        (new_scheme, foo) = re.subn('^[^+]+\+', '', design_uri.scheme)
        url = urllib.parse.urlunparse((new_scheme, design_uri.netloc, design_uri.path,
                                       design_uri.params, design_uri.query, design_uri.fragment))
        logger = logging.getLogger(__name__)
        logger.debug("Calling Keystone session for url %s" % str(url))
        resp = ks_sess.get(url)
        if resp.status_code >= 400:
            raise errors.InvalidDesignReference(
                "Received error code for reference %s: %s - %s" % (url, str(resp.status_code), resp.text))
        return resp.content
项目:lyp_pv    作者:wwqgtxx    | 项目源码 | 文件源码
def dl_json(url, header={}):
    '''
    http GET one file, and parse json text. return json info result
    '''
    # NOTE json must use 'utf-8' encoding
    blob = dl_blob(url, header=header)
    try:
        text = blob.decode('utf-8')
    except Exception as e:
        er = err.DecodingError('decode blob to json text with \"utf-8\" failed, URL ', url)
        er.blob = blob
        raise er from e
    try:
        info = json.loads(text)
        return info
    except Exception as e:
        er = err.ParseJSONError('parse json text failed, URL ', url)
        er.text = text
        raise er from e

# TODO now only support 'utf-8' encoding
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def __init__(self, credentials, host, request_uri, headers, response, content, http):
        from urllib.parse import urlencode
        Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
        challenge = _parse_www_authenticate(response, 'www-authenticate')
        service = challenge['googlelogin'].get('service', 'xapi')
        # Bloggger actually returns the service in the challenge
        # For the rest we guess based on the URI
        if service == 'xapi' and  request_uri.find("calendar") > 0:
            service = "cl"
        # No point in guessing Base or Spreadsheet
        #elif request_uri.find("spreadsheets") > 0:
        #    service = "wise"

        auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
        resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'})
        lines = content.split('\n')
        d = dict([tuple(line.split("=", 1)) for line in lines if line])
        if resp.status == 403:
            self.Auth = ""
        else:
            self.Auth = d['Auth']
项目:bitex    作者:nlsdfnbch    | 项目源码 | 文件源码
def sign(self, url, endpoint, endpoint_path, method_verb, *args, **kwargs):
        try:
            req = kwargs['params']
        except KeyError:
            req = {}

        req['nonce'] = self.nonce()
        postdata = urllib.parse.urlencode(req)

        # Unicode-objects must be encoded before hashing
        encoded = (str(req['nonce']) + postdata).encode('utf-8')
        message = (endpoint_path.encode('utf-8') +
                   hashlib.sha256(encoded).digest())

        signature = hmac.new(base64.b64decode(self.secret),
                             message, hashlib.sha512)
        sigdigest = base64.b64encode(signature.digest())

        headers = {
            'API-Key': self.key,
            'API-Sign': sigdigest.decode('utf-8')
        }

        return url, {'data': req, 'headers': headers}
项目:bitex    作者:nlsdfnbch    | 项目源码 | 文件源码
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs):
        nonce = self.nonce()
        try:
            params = kwargs['params']
        except KeyError:
            params = {}

        params['apikey'] = self.key
        params['nonce'] = nonce
        post_params = params
        post_params.update({'nonce': nonce, 'method': endpoint})
        post_params = urllib.parse.urlencode(post_params)

        url = uri + post_params

        sig = hmac.new(url, self.secret, hashlib.sha512)
        headers = {'apisign': sig}

        return url, {'headers': headers}
项目:bitex    作者:nlsdfnbch    | 项目源码 | 文件源码
def sign(self, url, endpoint, endpoint_path, method_verb, *args, **kwargs):

        try:
            params = kwargs['params']
        except KeyError:
            params = {}

        nonce = self.nonce()

        req_string = endpoint_path + '?apikey=' + self.key + "&nonce=" + nonce + '&'
        req_string += urllib.parse.urlencode(params)
        headers = {"apisign": hmac.new(self.secret.encode('utf-8'),
                                       (self.uri + req_string).encode('utf-8'),
                                       hashlib.sha512).hexdigest()}

        return self.uri + req_string, {'headers': headers, 'params': {}}
项目:jupyterhub-kubernetes_spawner    作者:danielfrg    | 项目源码 | 文件源码
def __deserialize_date(self, string):
        """
        Deserializes string to date.

        :param string: str.
        :return: date.
        """
        try:
            from dateutil.parser import parse
            return parse(string).date()
        except ImportError:
            return string
        except ValueError:
            raise ApiException(
                status=0,
                reason="Failed to parse `{0}` into a date object"
                .format(string)
            )
项目:jupyterhub-kubernetes_spawner    作者:danielfrg    | 项目源码 | 文件源码
def __deserialize_datatime(self, string):
        """
        Deserializes string to datetime.

        The string should be in iso8601 datetime format.

        :param string: str.
        :return: datetime.
        """
        try:
            from dateutil.parser import parse
            return parse(string)
        except ImportError:
            return string
        except ValueError:
            raise ApiException(
                status=0,
                reason="Failed to parse `{0}` into a datetime object".
                format(string)
            )
项目:ZoomEye    作者:felicitychou    | 项目源码 | 文件源码
def get(url,headers={},params=None):

    if sys.version_info[0] == 3:
        if params:
            data = parse.urlencode(params)
            url = "%s?%s" % (url, data)
            req = request.Request(url=url,headers=headers)
        else:
            req = request.Request(url=url,headers=headers)
        response = request.urlopen(req)
        return Response(response)
    elif sys.version_info[0] == 2:
        if params:
            data = urllib.urlencode(params)
            url = url + '?' + data
            req = urllib2.Request(url=url,headers=headers)
        else:
            req = urllib2.Request(url=url,headers=headers)
        response = urllib2.urlopen(req)
        return Response(response)
    else:
        return None
项目:web-scraping-101    作者:turtleDev    | 项目源码 | 文件源码
def parse(self, response, crawler):

        document = lxml.etree.HTML(response.text)
        for title in document.cssselect('tr.athing a.storylink'):
            yield title.text

        urlinfo = urllib.parse.urlparse(response.url)
        base_url = urlinfo.scheme + '://' + urlinfo.netloc

        try:
            href = document.cssselect('a.morelink')[0].get('href')
        except:
            return

        next_url = urllib.parse.urljoin(base_url, href)

        crawler.schedule_request(next_url)
项目:graylog.py    作者:yumimobi    | 项目源码 | 文件源码
def __deserialize_date(self, string):
        """
        Deserializes string to date.

        :param string: str.
        :return: date.
        """
        try:
            from dateutil.parser import parse
            return parse(string).date()
        except ImportError:
            return string
        except ValueError:
            raise ApiException(
                status=0,
                reason="Failed to parse `{0}` into a date object"
                .format(string)
            )
项目:graylog.py    作者:yumimobi    | 项目源码 | 文件源码
def __deserialize_datatime(self, string):
        """
        Deserializes string to datetime.

        The string should be in iso8601 datetime format.

        :param string: str.
        :return: datetime.
        """
        try:
            from dateutil.parser import parse
            return parse(string)
        except ImportError:
            return string
        except ValueError:
            raise ApiException(
                status=0,
                reason="Failed to parse `{0}` into a datetime object".
                format(string)
            )
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, url=None, *args, **kwargs):
        super(Bazaar, self).__init__(url, *args, **kwargs)
        # Python >= 2.7.4, 3.3 doesn't have uses_fragment or non_hierarchical
        # Register lp but do not expose as a scheme to support bzr+lp.
        if getattr(urllib_parse, 'uses_fragment', None):
            urllib_parse.uses_fragment.extend(['lp'])
            urllib_parse.non_hierarchical.extend(['lp'])
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def __init__(self, url=None, *args, **kwargs):
        super(Bazaar, self).__init__(url, *args, **kwargs)
        # Python >= 2.7.4, 3.3 doesn't have uses_fragment or non_hierarchical
        # Register lp but do not expose as a scheme to support bzr+lp.
        if getattr(urllib_parse, 'uses_fragment', None):
            urllib_parse.uses_fragment.extend(['lp'])
            urllib_parse.non_hierarchical.extend(['lp'])