我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用urllib2.parse()。
def __init__(self, uri, timeout=None, ssl_context_factory=None): """Initialize a HTTP Socket. @param uri(str) The http_scheme:://host:port/path to connect to. @param timeout timeout in ms """ parsed = urllib.parse.urlparse(uri) self.scheme = parsed.scheme assert self.scheme in ('http', 'https') if self.scheme == 'http': self.port = parsed.port or http_client.HTTP_PORT elif self.scheme == 'https': self.port = parsed.port or http_client.HTTPS_PORT self.host = parsed.hostname self.path = parsed.path if parsed.query: self.path += '?%s' % parsed.query self.__wbuf = BytesIO() self.__http = None self.__custom_headers = None self.__timeout = None if timeout: self.setTimeout(timeout) self._ssl_context_factory = ssl_context_factory
def parse_sphinx_searchindex(searchindex): """Parse a Sphinx search index Parameters ---------- searchindex : str The Sphinx search index (contents of searchindex.js) Returns ------- filenames : list of str The file names parsed from the search index. objects : dict The objects parsed from the search index. """ # Make sure searchindex uses UTF-8 encoding if hasattr(searchindex, 'decode'): searchindex = searchindex.decode('UTF-8') # parse objects query = 'objects:' pos = searchindex.find(query) if pos < 0: raise ValueError('"objects:" not found in search index') sel = _select_block(searchindex[pos:], '{', '}') objects = _parse_dict_recursive(sel) # parse filenames query = 'filenames:' pos = searchindex.find(query) if pos < 0: raise ValueError('"filenames:" not found in search index') filenames = searchindex[pos + len(query) + 1:] filenames = filenames[:filenames.find(']')] filenames = [f.strip('"') for f in filenames.split(',')] return filenames, objects
def s3upload(presigned_url, filename): """ Given an uploader URL and a filename, parse the URL and get the presigned URL then use the presigned URL to upload the file named filename to S3. Parameters: presigned_url (str): the presigned URL that that is the PUT endpoint filename (str): the file to be pushed to S3 Returns: (bool): True if the presigned URL was generated and the file uploaded successfully, else False. """ import logging ec2rlcore.logutil.LogUtil.get_root_logger().addHandler(logging.NullHandler()) try: # The response puts the URL string in double quotes so cut off the first and last characters with open(filename, "rb") as payload: response = requests.put(url=presigned_url, data=payload) if response.status_code == 200: ec2rlcore.dual_log("Upload successful") return True else: ec2rlcore.dual_log("ERROR: Upload failed. Received response {}".format( response.status_code)) raise S3UploadResponseError(response.status_code) except requests.exceptions.Timeout: raise requests.exceptions.Timeout("ERROR: connection timed out.") except IOError: raise S3UploadTarfileReadError("ERROR: there was an issue reading the file '{}'.".format(filename))
def __init__(self, error_message, *args): message = "Failed to parse URL: {}".format(error_message) super(S3UploadUrlParsingFailure, self).__init__(message, *args)
def identify_names(code): """Builds a codeobj summary by identifying and resovles used names >>> code = ''' ... from a.b import c ... import d as e ... print(c) ... e.HelloWorld().f.g ... ''' >>> for name, o in sorted(identify_names(code).items()): ... print(name, o['name'], o['module'], o['module_short']) c c a.b a.b e.HelloWorld HelloWorld d d """ finder = NameFinder() finder.visit(ast.parse(code)) example_code_obj = {} for name, full_name in finder.get_mapping(): # name is as written in file (e.g. np.asarray) # full_name includes resolved import path (e.g. numpy.asarray) module, attribute = full_name.rsplit('.', 1) # get shortened module name module_short = get_short_module_name(module, attribute) cobj = {'name': attribute, 'module': module, 'module_short': module_short} example_code_obj[name] = cobj return example_code_obj
def flush(self): if self.isOpen(): self.close() self.open() # Pull data out of buffer data = self.__wbuf.getvalue() self.__wbuf = BytesIO() # HTTP request self.__http.putrequest('POST', self.path, skip_host=True) # Write headers self.__http.putheader('Host', self.host) self.__http.putheader('Content-Type', 'application/x-thrift') self.__http.putheader('Content-Length', str(len(data))) if (not self.__custom_headers or 'User-Agent' not in self.__custom_headers): user_agent = 'Python/THttpClient' script = os.path.basename(sys.argv[0]) if script: user_agent = '%s (%s)' % ( user_agent, urllib.parse.quote(script)) self.__http.putheader('User-Agent', user_agent) if self.__custom_headers: for key, val in self.__custom_headers.items(): self.__http.putheader(key, val) self.__http.endheaders() # Write payload self.__http.send(data) # Get reply to flush the request response = self.__http.getresponse() self.code, self.message, self.headers = ( response.status, response.msg, response.getheaders()) self.response = response
def get_presigned_url(uploader_url, filename, region="us-east-1"): """ Given an uploader URL and a filename, parse the URL and get the presigned URL then use the presigned URL to upload the file named filename to S3. Parameters: uploader_url (str): the uploader URL provided by the AWS engineer filename (str): the file to be pushed to S3 region (str): the S3 endpoint the file should be uploaded to Returns: presigned_url (str): the presigned URL as a string """ endpoint = "https://30yinsv8k6.execute-api.us-east-1.amazonaws.com/prod/get-signed-url" s3uploader_regions = ["us-east-1", "eu-west-1", "ap-northeast-1"] if region not in s3uploader_regions: region = "us-east-1" query_str = urlparse.urlparse(uploader_url).query put_dict = urlparse.parse_qs(query_str) # If uploader_url is not parsable then maybe it is a shortened URL try: if not put_dict: the_request = urllib_request.Request(uploader_url) uploader_url = urllib_urlopen(the_request).geturl() query_str = urlparse.urlparse(uploader_url).query put_dict = urlparse.parse_qs(query_str) except urllib_urlerror: pass if put_dict: # urlparse.parse_qs returns dict values that are single value lists # Reassign the values to the first value in the list put_dict["accountId"] = put_dict["account-id"][0] put_dict["caseId"] = put_dict["case-id"][0] put_dict["key"] = put_dict["key"][0] put_dict["expiration"] = int(put_dict["expiration"][0]) put_dict["fileName"] = filename put_dict["region"] = region else: raise S3UploadUrlParsingFailure(uploader_url) json_payload = json.dumps(put_dict) try: response = requests.post(url=endpoint, data=json_payload) # If the initial put was successful then proceed with uploading the file using the returned presigned URL if response.status_code == 200: presigned_url = response.text return presigned_url else: raise S3UploadGetPresignedURLError(response.status_code, uploader_url) except requests.exceptions.Timeout: raise requests.exceptions.Timeout("ERROR: Connection timed out.")