Python urllib2 模块,parse() 实例源码


项目:flattools    作者:adsharma    | 项目源码 | 文件源码
def __init__(self, uri, timeout=None, ssl_context_factory=None):
        """Initialize a HTTP Socket.

        @param uri(str)    The http_scheme:://host:port/path to connect to.
        @param timeout   timeout in ms
        parsed = urllib.parse.urlparse(uri)
        self.scheme = parsed.scheme
        assert self.scheme in ('http', 'https')
        if self.scheme == 'http':
            self.port = parsed.port or http_client.HTTP_PORT
        elif self.scheme == 'https':
            self.port = parsed.port or http_client.HTTPS_PORT = parsed.hostname
        self.path = parsed.path
        if parsed.query:
            self.path += '?%s' % parsed.query
        self.__wbuf = BytesIO()
        self.__http = None
        self.__custom_headers = None
        self.__timeout = None
        if timeout:
        self._ssl_context_factory = ssl_context_factory
项目:sequana    作者:sequana    | 项目源码 | 文件源码
def parse_sphinx_searchindex(searchindex):
    """Parse a Sphinx search index

    searchindex : str
        The Sphinx search index (contents of searchindex.js)

    filenames : list of str
        The file names parsed from the search index.
    objects : dict
        The objects parsed from the search index.
    # Make sure searchindex uses UTF-8 encoding
    if hasattr(searchindex, 'decode'):
        searchindex = searchindex.decode('UTF-8')

    # parse objects
    query = 'objects:'
    pos = searchindex.find(query)
    if pos < 0:
        raise ValueError('"objects:" not found in search index')

    sel = _select_block(searchindex[pos:], '{', '}')
    objects = _parse_dict_recursive(sel)

    # parse filenames
    query = 'filenames:'
    pos = searchindex.find(query)
    if pos < 0:
        raise ValueError('"filenames:" not found in search index')
    filenames = searchindex[pos + len(query) + 1:]
    filenames = filenames[:filenames.find(']')]
    filenames = [f.strip('"') for f in filenames.split(',')]

    return filenames, objects
项目:multi-diffusion    作者:chemical-diffusion    | 项目源码 | 文件源码
def parse_sphinx_searchindex(searchindex):
    """Parse a Sphinx search index

    searchindex : str
        The Sphinx search index (contents of searchindex.js)

    filenames : list of str
        The file names parsed from the search index.
    objects : dict
        The objects parsed from the search index.
    # Make sure searchindex uses UTF-8 encoding
    if hasattr(searchindex, 'decode'):
        searchindex = searchindex.decode('UTF-8')

    # parse objects
    query = 'objects:'
    pos = searchindex.find(query)
    if pos < 0:
        raise ValueError('"objects:" not found in search index')

    sel = _select_block(searchindex[pos:], '{', '}')
    objects = _parse_dict_recursive(sel)

    # parse filenames
    query = 'filenames:'
    pos = searchindex.find(query)
    if pos < 0:
        raise ValueError('"filenames:" not found in search index')
    filenames = searchindex[pos + len(query) + 1:]
    filenames = filenames[:filenames.find(']')]
    filenames = [f.strip('"') for f in filenames.split(',')]

    return filenames, objects
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def s3upload(presigned_url, filename):
    Given an uploader URL and a filename, parse the URL and get the presigned URL then use the presigned URL to upload
    the file named filename to S3.

        presigned_url (str): the presigned URL that that is the PUT endpoint
        filename (str): the file to be pushed to S3

        (bool): True if the presigned URL was generated and the file uploaded successfully, else False.
    import logging
        # The response puts the URL string in double quotes so cut off the first and last characters
        with open(filename, "rb") as payload:
            response = requests.put(url=presigned_url, data=payload)
            if response.status_code == 200:
                ec2rlcore.dual_log("Upload successful")
                return True
                ec2rlcore.dual_log("ERROR: Upload failed.  Received response {}".format(
                raise S3UploadResponseError(response.status_code)
    except requests.exceptions.Timeout:
        raise requests.exceptions.Timeout("ERROR: connection timed out.")
    except IOError:
        raise S3UploadTarfileReadError("ERROR: there was an issue reading the file '{}'.".format(filename))
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def __init__(self, error_message, *args):
        message = "Failed to parse URL: {}".format(error_message)
        super(S3UploadUrlParsingFailure, self).__init__(message, *args)
项目:dyfunconn    作者:makism    | 项目源码 | 文件源码
def identify_names(code):
    """Builds a codeobj summary by identifying and resovles used names

    >>> code = '''
    ... from a.b import c
    ... import d as e
    ... print(c)
    ... e.HelloWorld().f.g
    ... '''
    >>> for name, o in sorted(identify_names(code).items()):
    ...     print(name, o['name'], o['module'], o['module_short'])
    c c a.b a.b
    e.HelloWorld HelloWorld d d
    finder = NameFinder()

    example_code_obj = {}
    for name, full_name in finder.get_mapping():
        # name is as written in file (e.g. np.asarray)
        # full_name includes resolved import path (e.g. numpy.asarray)
        module, attribute = full_name.rsplit('.', 1)
        # get shortened module name
        module_short = get_short_module_name(module, attribute)
        cobj = {'name': attribute, 'module': module,
                'module_short': module_short}
        example_code_obj[name] = cobj
    return example_code_obj
项目:polylearn    作者:scikit-learn-contrib    | 项目源码 | 文件源码
def identify_names(code):
    """Builds a codeobj summary by identifying and resovles used names

    >>> code = '''
    ... from a.b import c
    ... import d as e
    ... print(c)
    ... e.HelloWorld().f.g
    ... '''
    >>> for name, o in sorted(identify_names(code).items()):
    ...     print(name, o['name'], o['module'], o['module_short'])
    c c a.b a.b
    e.HelloWorld HelloWorld d d
    finder = NameFinder()

    example_code_obj = {}
    for name, full_name in finder.get_mapping():
        # name is as written in file (e.g. np.asarray)
        # full_name includes resolved import path (e.g. numpy.asarray)
        module, attribute = full_name.rsplit('.', 1)
        # get shortened module name
        module_short = get_short_module_name(module, attribute)
        cobj = {'name': attribute, 'module': module,
                'module_short': module_short}
        example_code_obj[name] = cobj
    return example_code_obj
项目:flattools    作者:adsharma    | 项目源码 | 文件源码
def flush(self):
        if self.isOpen():

        # Pull data out of buffer
        data = self.__wbuf.getvalue()
        self.__wbuf = BytesIO()

        # HTTP request
        self.__http.putrequest('POST', self.path, skip_host=True)

        # Write headers
        self.__http.putheader('Content-Type', 'application/x-thrift')
        self.__http.putheader('Content-Length', str(len(data)))

        if (not self.__custom_headers or
                'User-Agent' not in self.__custom_headers):
            user_agent = 'Python/THttpClient'
            script = os.path.basename(sys.argv[0])
            if script:
                user_agent = '%s (%s)' % (
                    user_agent, urllib.parse.quote(script))
                self.__http.putheader('User-Agent', user_agent)

        if self.__custom_headers:
            for key, val in self.__custom_headers.items():
                self.__http.putheader(key, val)


        # Write payload

        # Get reply to flush the request
        response = self.__http.getresponse()
        self.code, self.message, self.headers = (
            response.status, response.msg, response.getheaders())
        self.response = response
项目:Parallel-SGD    作者:angadgill    | 项目源码 | 文件源码
def identify_names(code):
    """Builds a codeobj summary by identifying and resovles used names

    >>> code = '''
    ... from a.b import c
    ... import d as e
    ... print(c)
    ... e.HelloWorld().f.g
    ... '''
    >>> for name, o in sorted(identify_names(code).items()):
    ...     print(name, o['name'], o['module'], o['module_short'])
    c c a.b a.b
    e.HelloWorld HelloWorld d d
    finder = NameFinder()

    example_code_obj = {}
    for name, full_name in finder.get_mapping():
        # name is as written in file (e.g. np.asarray)
        # full_name includes resolved import path (e.g. numpy.asarray)
        module, attribute = full_name.rsplit('.', 1)
        # get shortened module name
        module_short = get_short_module_name(module, attribute)
        cobj = {'name': attribute, 'module': module,
                'module_short': module_short}
        example_code_obj[name] = cobj
    return example_code_obj
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def get_presigned_url(uploader_url, filename, region="us-east-1"):
    Given an uploader URL and a filename, parse the URL and get the presigned URL then use the presigned URL to upload
    the file named filename to S3.

        uploader_url (str): the uploader URL provided by the AWS engineer
        filename (str): the file to be pushed to S3
        region (str): the S3 endpoint the file should be uploaded to

        presigned_url (str): the presigned URL as a string
    endpoint = ""

    s3uploader_regions = ["us-east-1", "eu-west-1", "ap-northeast-1"]
    if region not in s3uploader_regions:
        region = "us-east-1"

    query_str = urlparse.urlparse(uploader_url).query
    put_dict = urlparse.parse_qs(query_str)

    # If uploader_url is not parsable then maybe it is a shortened URL
        if not put_dict:
            the_request = urllib_request.Request(uploader_url)
            uploader_url = urllib_urlopen(the_request).geturl()
            query_str = urlparse.urlparse(uploader_url).query
            put_dict = urlparse.parse_qs(query_str)
    except urllib_urlerror:

    if put_dict:
        # urlparse.parse_qs returns dict values that are single value lists
        # Reassign the values to the first value in the list
        put_dict["accountId"] = put_dict["account-id"][0]
        put_dict["caseId"] = put_dict["case-id"][0]
        put_dict["key"] = put_dict["key"][0]
        put_dict["expiration"] = int(put_dict["expiration"][0])
        put_dict["fileName"] = filename
        put_dict["region"] = region
        raise S3UploadUrlParsingFailure(uploader_url)

    json_payload = json.dumps(put_dict)

        response =, data=json_payload)
        # If the initial put was successful then proceed with uploading the file using the returned presigned URL
        if response.status_code == 200:
            presigned_url = response.text
            return presigned_url
            raise S3UploadGetPresignedURLError(response.status_code, uploader_url)
    except requests.exceptions.Timeout:
        raise requests.exceptions.Timeout("ERROR: Connection timed out.")