我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用six.moves.urllib.parse.urlunparse()。
def sort_url_by_query_keys(url): """A helper function which sorts the keys of the query string of a url. For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10' returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to prevent non-deterministic ordering of the query string causing problems with unit tests. :param url: url which will be ordered by query keys :returns url: url with ordered query keys """ parsed = urlparse.urlparse(url) queries = urlparse.parse_qsl(parsed.query, True) sorted_query = sorted(queries, key=lambda x: x[0]) encoded_sorted_query = urlparse.urlencode(sorted_query, True) url_parts = (parsed.scheme, parsed.netloc, parsed.path, parsed.params, encoded_sorted_query, parsed.fragment) return urlparse.urlunparse(url_parts)
def validate_link(self, link, bookmark=False): """Checks if the given link can get correct data.""" # removes the scheme and net location parts of the link url_parts = list(urlparse.urlparse(link)) url_parts[0] = url_parts[1] = '' # bookmark link should not have the version in the URL if bookmark and url_parts[2].startswith(PATH_PREFIX): return False full_path = urlparse.urlunparse(url_parts) try: self.get_json(full_path, path_prefix='') return True except Exception: return False
def extract_image_url(text): text = _strip_url(text) imgurl = None if text: # check if the text is style content match = _CSS_IMAGERE.search(text) text = match.groups()[0] if match else text parsed = urlparse(text) path = None match = _IMAGE_PATH_RE.search(parsed.path) if match: path = match.group() elif parsed.query: match = _GENERIC_PATH_RE.search(parsed.path) if match: path = match.group() if path is not None: parsed = list(parsed) parsed[2] = path imgurl = urlunparse(parsed) if not imgurl: imgurl = text return imgurl
def _fix_esx_url(url, host, port): """Fix netloc in the case of an ESX host. In the case of an ESX host, the netloc is set to '*' in the URL returned in HttpNfcLeaseInfo. It should be replaced with host name or IP address. """ urlp = urlparse.urlparse(url) if urlp.netloc == '*': scheme, netloc, path, params, query, fragment = urlp if netutils.is_valid_ipv6(host): netloc = '[%s]:%d' % (host, port) else: netloc = "%s:%d" % (host, port) url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) return url
def _decorate_request(self, filters, method, url, headers=None, body=None, auth_data=None): if auth_data is None: auth_data = self.auth_data token, _ = auth_data #base_url = self.base_url(filters=filters, auth_data=auth_data) base_url = url # build authenticated request # returns new request, it does not touch the original values _headers = copy.deepcopy(headers) if headers is not None else {} _headers['X-Auth-Token'] = str(token) if url is None or url == "": _url = base_url else: # Join base URL and url, and remove multiple contiguous slashes _url = "/".join([base_url, url]) parts = [x for x in urlparse.urlparse(_url)] parts[2] = re.sub("/{2,}", "/", parts[2]) _url = urlparse.urlunparse(parts) # no change to method or body return str(_url), _headers, body
def config(): answer = input("Please enter the URL for your custom Quilt registry (ask your administrator),\n" + "or leave this line blank to use the default registry: ") if answer: url = urlparse(answer.rstrip('/')) if (url.scheme not in ['http', 'https'] or not url.netloc or url.path or url.params or url.query or url.fragment): raise CommandException("Invalid URL: %s" % answer) canonical_url = urlunparse(url) else: # When saving the config, store '' instead of the actual URL in case we ever change it. canonical_url = '' cfg = _load_config() cfg['registry_url'] = canonical_url _save_config(cfg) # Clear the cached URL. global _registry_url _registry_url = None
def ticket_request_url( url, fmt=None, reference_name=None, reference_md5=None, start=None, end=None, fields=None, tags=None, notags=None, data_format=None): parsed_url = urlparse(url) get_vars = parse_qs(parsed_url.query) # TODO error checking if reference_name is not None: get_vars["referenceName"] = reference_name if reference_md5 is not None: get_vars["referenceMD5"] = reference_md5 if start is not None: get_vars["start"] = int(start) if end is not None: get_vars["end"] = int(end) if data_format is not None: get_vars["format"] = data_format.upper() # if fields is not None: # get_vars["fields"] = ",".join(fields) # if tags is not None: # get_vars["tags"] = ",".join(tags) # if notags is not None: # get_vars["notags"] = ",".join(notags) new_url = list(parsed_url) new_url[4] = urlencode(get_vars, doseq=True) return urlunparse(new_url)
def run(self): self.__retry(self._handle_ticket_request) self.data_format = self.ticket.get("format", "BAM") self.md5 = self.ticket.get("md5", None) for url_object in self.ticket["urls"]: url = urlparse(url_object["url"]) if url.scheme.startswith("http"): headers = url_object.get("headers", "") self.__retry(self._handle_http_url, urlunparse(url), headers) elif url.scheme == "data": self._handle_data_uri(url) else: raise ValueError("Unsupported URL scheme:{}".format(url.scheme))
def __init__(self, url, conf): db_name = 'panko_%s' % uuidutils.generate_uuid(dashed=False) engine = sqlalchemy.create_engine(url) conn = engine.connect() self._create_database(conn, db_name) conn.close() engine.dispose() parsed = list(urlparse.urlparse(url)) parsed[2] = '/' + db_name self.url = urlparse.urlunparse(parsed) self.conf = conf
def iter_json_batches(inputs, base_url, batch_size, keep_instance_path): parsed_base_url = urlparse(base_url) current_uri = None current_batch = [] for href, resource in inputs: # Skip over links-only (discovery) resources if resource.keys() == ["_links"]: continue # Inject the base URL's scheme and netloc; `urljoin` should do exactly this operation, # but actually won't if the right-hand-side term defines its own netloc parsed_href = urlparse(href) uri = urlunparse(parsed_href._replace( scheme=parsed_base_url.scheme, netloc=parsed_base_url.netloc, )) if batch_size == 1: yield (uri, [resource]) else: # batch handling if keep_instance_path: collection_uri = uri.rsplit("?", 1)[0] else: collection_uri = uri.rsplit("/", 1)[0] if any(( current_uri is not None and current_uri != collection_uri, len(current_batch) >= batch_size, )): yield (current_uri, current_batch) current_batch = [] current_uri = collection_uri current_batch.append(resource) if current_batch: yield (current_uri, current_batch)
def get_websocket_url(url): parsed_url = urlparse(url) parts = list(parsed_url) if parsed_url.scheme == 'http': parts[0] = 'ws' elif parsed_url.scheme == 'https': parts[0] = 'wss' return urlunparse(parts)
def _build_conf_providers(self): def _schemed_url(uri): uri = uri.strip('/') return urlparse.urlparse( uri if uri.startswith('http') else "%s://%s" % (self._http_provider.default_scheme, uri)) conf_urls = self.nsxlib_config.nsx_api_managers[:] urls = [] providers = [] provider_index = -1 for conf_url in conf_urls: provider_index += 1 conf_url = _schemed_url(conf_url) if conf_url in urls: LOG.warning("'%s' already defined in configuration file. " "Skipping.", urlparse.urlunparse(conf_url)) continue urls.append(conf_url) providers.append( Provider( conf_url.netloc, urlparse.urlunparse(conf_url), self.nsxlib_config.username(provider_index), self.nsxlib_config.password(provider_index), self.nsxlib_config.ca_file(provider_index))) return providers
def get_endpoint_from_parsed_url(parsed_url): url_list = [(lambda: x if e < 2 else '')() for e, x in enumerate(list(parsed_url))] return urlparse.urlunparse(url_list)
def _build_url(url, _params): """Build the actual URL to use.""" # Support for unicode domain names and paths. scheme, netloc, path, params, query, fragment = urlparse(url) netloc = netloc.encode('idna').decode('utf-8') if not path: path = '/' if six.PY2: if isinstance(scheme, six.text_type): scheme = scheme.encode('utf-8') if isinstance(netloc, six.text_type): netloc = netloc.encode('utf-8') if isinstance(path, six.text_type): path = path.encode('utf-8') if isinstance(params, six.text_type): params = params.encode('utf-8') if isinstance(query, six.text_type): query = query.encode('utf-8') if isinstance(fragment, six.text_type): fragment = fragment.encode('utf-8') enc_params = _encode_params(_params) if enc_params: if query: query = '%s&%s' % (query, enc_params) else: query = enc_params url = (urlunparse([scheme, netloc, path, params, query, fragment])) return url
def create_plugin(self, session, version, url, raw_status=None): """Handle default Keystone endpoint configuration Build the actual API endpoint from the scheme, host and port of the original auth URL and the rest from the returned version URL. """ ver_u = urlparse.urlparse(url) # Only hack this if it is the default setting if ver_u.netloc.startswith('localhost'): auth_u = urlparse.urlparse(self.auth_url) # from original auth_url: scheme, netloc # from api_url: path, query (basically, the rest) url = urlparse.urlunparse(( auth_u.scheme, auth_u.netloc, ver_u.path, ver_u.params, ver_u.query, ver_u.fragment, )) LOG.debug('Version URL updated: %s', url) return super(OSCGenericPassword, self).create_plugin( session=session, version=version, url=url, raw_status=raw_status, )
def relative(self, url_string): url_bits = urllib_parse.urlparse(url_string) return urllib_parse.urlunparse(('', '', url_bits.path, url_bits.params, url_bits.query, url_bits.fragment))
def start_requests(self): """ In the scrapy doc there are two ways to tell scrapy where to begin to crawl from. One is start_requests, the other is start_urls which is shortcut to the start_requestt. Based on my experience, it is better to use start_requests instead of start_urls bacause in this methods you can know how the request object are created and how request is yield. You should keep it simple and try not to use some magic or it might confuse you. In this project, you have no need to change code in this method, just modify code in parse_entry_page If you fully understatnd how scrapy work, then you are free to choose between start_requests and start_urls. """ prefix = self.settings["WEB_APP_PREFIX"] result = parse.urlparse(prefix) base_url = parse.urlunparse( (result.scheme, result.netloc, "", "", "", "") ) # Generate start url from config and self.entry, when you paste the code # to another spider you can just change self.entry and self.taskid url = parse.urljoin(base_url, self.entry) print(url) request = Request(url=url, callback=self.parse_entry_page) request.headers[ 'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36' request.headers[ 'Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' request.headers['Accept-Encoding'] = 'gzip, deflate, sdch' request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6' request.headers['Connection'] = 'keep-alive' request.headers['Host'] = '115.28.36.253:8000' request.headers['DNT'] = 1 # request.cookies['token'] = '4TO4N49X81' yield request
def start_requests(self): """ In the scrapy doc there are two ways to tell scrapy where to begin to crawl from. One is start_requests, the other is start_urls which is shortcut to the start_requestt. Based on my experience, it is better to use start_requests instead of start_urls bacause in this methods you can know how the request object are created and how request is yield. You should keep it simple and try not to use some magic or it might confuse you. In this project, you have no need to change code in this method, just modify code in parse_entry_page If you fully understatnd how scrapy work, then you are free to choose between start_requests and start_urls. """ prefix = self.settings["WEB_APP_PREFIX"] result = parse.urlparse(prefix) base_url = parse.urlunparse( (result.scheme, result.netloc, "", "", "", "") ) # Generate start url from config and self.entry, when you paste the code # to another spider you can just change self.entry and self.taskid url = parse.urljoin(base_url, self.entry) request = Request(url=url, callback=self.parse_entry_page) request.headers[ 'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36' request.headers[ 'Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' request.headers['Accept-Encoding'] = 'gzip, deflate, sdch' request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6' request.headers['Connection'] = 'keep-alive' request.headers['Host'] = '115.28.36.253:8000' request.headers['DNT'] = 1 yield request
def start_requests(self): """ In the scrapy doc there are two ways to tell scrapy where to begin to crawl from. One is start_requests, the other is start_urls which is shortcut to the start_requestt. Based on my experience, it is better to use start_requests instead of start_urls bacause in this methods you can know how the request object are created and how request is yield. You should keep it simple and try not to use some magic or it might confuse you. In this project, you have no need to change code in this method, just modify code in parse_entry_page If you fully understatnd how scrapy work, then you are free to choose between start_requests and start_urls. """ prefix = self.settings["WEB_APP_PREFIX"] result = parse.urlparse(prefix) base_url = parse.urlunparse( (result.scheme, result.netloc, "", "", "", "") ) # Generate start url from config and self.entry, when you paste the code # to another spider you can just change self.entry and self.taskid url = parse.urljoin(base_url, self.entry) print(url) request = Request(url=url, callback=self.parse_entry_page) request.headers[ 'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36' request.headers[ 'Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' request.headers['Accept-Encoding'] = 'gzip, deflate, sdch' request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6' request.headers['Connection'] = 'keep-alive' request.headers['Host'] = '115.28.36.253:8000' request.headers['DNT'] = 1 yield request
def build_url(url, _params): """Build the actual URL to use.""" # Support for unicode domain names and paths. scheme, netloc, path, params, query, fragment = urlparse(url) netloc = netloc.encode('idna').decode('utf-8') if not path: path = '/' if six.PY2: if isinstance(scheme, six.text_type): scheme = scheme.encode('utf-8') if isinstance(netloc, six.text_type): netloc = netloc.encode('utf-8') if isinstance(path, six.text_type): path = path.encode('utf-8') if isinstance(params, six.text_type): params = params.encode('utf-8') if isinstance(query, six.text_type): query = query.encode('utf-8') if isinstance(fragment, six.text_type): fragment = fragment.encode('utf-8') enc_params = encode_params(_params) if enc_params: if query: query = '%s&%s' % (query, enc_params) else: query = enc_params url = (urlunparse([scheme, netloc, path, params, query, fragment])) return url
def _parse_contents(self, response): # Wix pages aren't really parseable, so anytime we see them, # let's re-run it (depth-1) with an escaped-fragment to get the real html source if 'https://static.wixstatic.com/' in response.body and '_escaped_fragment_' not in response.url: parsed_url = urlparse(response.url) qs = parse_qs(parsed_url.query) qs['_escaped_fragment_'] = '' wix_scrapeable_url = urlunparse( (parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, urlencode(qs), parsed_url.fragment) ) response.meta['depth'] -= 1 return [scrapy.Request(wix_scrapeable_url, self.parse)] return if not hasattr(response, 'selector'): logging.info('Skipping unknown file from: %s', response.url) return # Get all text contents of tags (unless they are script or style tags) text_contents = ' '.join(response.selector.xpath('//*[not(self::script|self::style)]/text()').extract()).lower() processed_text = event_classifier.StringProcessor(text_contents, regex_keywords.WORD_BOUNDARIES) wrong = processed_text.get_tokens(keywords.DANCE_WRONG_STYLE) good = processed_text.get_tokens(rules.STREET_STYLE) if (wrong or good): #print response.url, set(wrong), set(good) pass
def strip_fragment(self, path): result = urlparse(path) result = result._replace(fragment='') return six.text_type(urlunparse(result))
def url_http_to_https(value): parts = urlparse(value) if parts.scheme != 'http': return value # Check if the url contains ':80' and remove it if that is the case netloc_parts = parts.netloc.rsplit(':', 1) if len(netloc_parts) == 2 and netloc_parts[1] == '80': netloc = netloc_parts[0] else: netloc = parts.netloc return urlunparse(('https', netloc) + parts[2:])
def expand_redirect_url(starting_url, key, bucket): """ Add key and bucket parameters to starting URL query string. """ parsed = urlparse.urlparse(starting_url) query = collections.OrderedDict(urlparse.parse_qsl(parsed.query)) query.update([('key', key), ('bucket', bucket)]) redirect_url = urlparse.urlunparse(( parsed.scheme, parsed.netloc, parsed.path, parsed.params, urlparse.urlencode(query), None)) return redirect_url
def get_repo_url(self): return urlunparse(("file", "", pathname2url( self.__dir), "", "", ""))
def __new__(cls, origin_url, create_repo=False, pkg_name=None, repo_props=EmptyDict, trans_id=None, noexecute=False, xport=None, pub=None, progtrack=None): scheme, netloc, path, params, query, fragment = \ urlparse(origin_url, "http", allow_fragments=0) scheme = scheme.lower() if noexecute: scheme = "null" if scheme != "null" and (not xport or not pub): raise TransactionError("Caller must supply transport " "and publisher.") if scheme not in cls.__schemes: raise TransactionRepositoryURLError(origin_url, scheme=scheme) if scheme.startswith("http") and not netloc: raise TransactionRepositoryURLError(origin_url, netloc=None) if scheme.startswith("file"): if netloc: raise TransactionRepositoryURLError(origin_url, msg="'{0}' contains host information, which " "is not supported for filesystem " "operations.".format(netloc)) # as we're urlunparsing below, we need to ensure that # the path starts with only one '/' character, if any # are present if path.startswith("/"): path = "/" + path.lstrip("/") elif not path: raise TransactionRepositoryURLError(origin_url) # Rebuild the url with the sanitized components. origin_url = urlunparse((scheme, netloc, path, params, query, fragment)) return cls.__schemes[scheme](origin_url, create_repo=create_repo, pkg_name=pkg_name, repo_props=repo_props, trans_id=trans_id, xport=xport, pub=pub, progtrack=progtrack)
def parse_uri(uri, cwd=None): """Parse the repository location provided and attempt to transform it into a valid repository URI. 'cwd' is the working directory to use to turn paths into an absolute path. If not provided, the current working directory is used. """ if uri.find("://") == -1 and not uri.startswith("file:/"): # Convert the file path to a URI. if not cwd: uri = os.path.abspath(uri) elif not os.path.isabs(uri): uri = os.path.normpath(os.path.join(cwd, uri)) uri = urlunparse(("file", "", pathname2url(uri), "", "", "")) scheme, netloc, path, params, query, fragment = \ urlparse(uri, "file", allow_fragments=0) scheme = scheme.lower() if scheme == "file": # During urlunparsing below, ensure that the path starts with # only one '/' character, if any are present. if path.startswith("/"): path = "/" + path.lstrip("/") # Rebuild the URI with the sanitized components. return urlunparse((scheme, netloc, path, params, query, fragment))
def execute(self, http=None, num_retries=0): """Execute the request. Args: http: httplib2.Http, an http object to be used in place of the one the HttpRequest request object was constructed with. num_retries: Integer, number of times to retry 500's with randomized exponential backoff. If all retries fail, the raised HttpError represents the last request. If zero (default), we attempt the request only once. Returns: A deserialized object model of the response body as determined by the postproc. Raises: googleapiclient.errors.HttpError if the response was not a 2xx. httplib2.HttpLib2Error if a transport error has occured. """ if http is None: http = self.http if self.resumable: body = None while body is None: _, body = self.next_chunk(http=http, num_retries=num_retries) return body # Non-resumable case. if 'content-length' not in self.headers: self.headers['content-length'] = str(self.body_size) # If the request URI is too long then turn it into a POST request. if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET': self.method = 'POST' self.headers['x-http-method-override'] = 'GET' self.headers['content-type'] = 'application/x-www-form-urlencoded' parsed = urlparse(self.uri) self.uri = urlunparse( (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None, None) ) self.body = parsed.query self.headers['content-length'] = str(len(self.body)) # Handle retries for server-side errors. resp, content = _retry_request( http, num_retries, 'request', self._sleep, self._rand, str(self.uri), method=str(self.method), body=self.body, headers=self.headers) for callback in self.response_callbacks: callback(resp) if resp.status >= 300: raise HttpError(resp, content, uri=self.uri) return self.postproc(resp, content)
def _serialize_request(self, request): """Convert an HttpRequest object into a string. Args: request: HttpRequest, the request to serialize. Returns: The request as a string in application/http format. """ # Construct status line parsed = urlparse(request.uri) request_line = urlunparse( ('', '', parsed.path, parsed.params, parsed.query, '') ) status_line = request.method + ' ' + request_line + ' HTTP/1.1\n' major, minor = request.headers.get('content-type', 'application/json').split('/') msg = MIMENonMultipart(major, minor) headers = request.headers.copy() if request.http is not None and hasattr(request.http.request, 'credentials'): request.http.request.credentials.apply(headers) # MIMENonMultipart adds its own Content-Type header. if 'content-type' in headers: del headers['content-type'] for key, value in six.iteritems(headers): msg[key] = value msg['Host'] = parsed.netloc msg.set_unixfrom(None) if request.body is not None: msg.set_payload(request.body) msg['content-length'] = str(len(request.body)) # Serialize the mime message. fp = StringIO() # maxheaderlen=0 means don't line wrap headers. g = Generator(fp, maxheaderlen=0) g.flatten(msg, unixfrom=False) body = fp.getvalue() return status_line + body
def execute(self, http=None, num_retries=0): """Execute the request. Args: http: httplib2.Http, an http object to be used in place of the one the HttpRequest request object was constructed with. num_retries: Integer, number of times to retry with randomized exponential backoff. If all retries fail, the raised HttpError represents the last request. If zero (default), we attempt the request only once. Returns: A deserialized object model of the response body as determined by the postproc. Raises: googleapiclient.errors.HttpError if the response was not a 2xx. httplib2.HttpLib2Error if a transport error has occured. """ if http is None: http = self.http if self.resumable: body = None while body is None: _, body = self.next_chunk(http=http, num_retries=num_retries) return body # Non-resumable case. if 'content-length' not in self.headers: self.headers['content-length'] = str(self.body_size) # If the request URI is too long then turn it into a POST request. # Assume that a GET request never contains a request body. if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET': self.method = 'POST' self.headers['x-http-method-override'] = 'GET' self.headers['content-type'] = 'application/x-www-form-urlencoded' parsed = urlparse(self.uri) self.uri = urlunparse( (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None, None) ) self.body = parsed.query self.headers['content-length'] = str(len(self.body)) # Handle retries for server-side errors. resp, content = _retry_request( http, num_retries, 'request', self._sleep, self._rand, str(self.uri), method=str(self.method), body=self.body, headers=self.headers) for callback in self.response_callbacks: callback(resp) if resp.status >= 300: raise HttpError(resp, content, uri=self.uri) return self.postproc(resp, content)
def _serialize_request(self, request): """Convert an HttpRequest object into a string. Args: request: HttpRequest, the request to serialize. Returns: The request as a string in application/http format. """ # Construct status line parsed = urlparse(request.uri) request_line = urlunparse( ('', '', parsed.path, parsed.params, parsed.query, '') ) status_line = request.method + ' ' + request_line + ' HTTP/1.1\n' major, minor = request.headers.get('content-type', 'application/json').split('/') msg = MIMENonMultipart(major, minor) headers = request.headers.copy() if request.http is not None: credentials = _auth.get_credentials_from_http(request.http) if credentials is not None: _auth.apply_credentials(credentials, headers) # MIMENonMultipart adds its own Content-Type header. if 'content-type' in headers: del headers['content-type'] for key, value in six.iteritems(headers): msg[key] = value msg['Host'] = parsed.netloc msg.set_unixfrom(None) if request.body is not None: msg.set_payload(request.body) msg['content-length'] = str(len(request.body)) # Serialize the mime message. fp = StringIO() # maxheaderlen=0 means don't line wrap headers. g = Generator(fp, maxheaderlen=0) g.flatten(msg, unixfrom=False) body = fp.getvalue() return status_line + body
def start_fixture(self): """Set up config.""" global LOAD_APP_KWARGS self.conf = None # Determine the database connection. db_url = os.environ.get('PIFPAF_URL', "sqlite://").replace( "mysql://", "mysql+pymysql://") if not db_url: raise case.SkipTest('No database connection configured') conf = self.conf = service.prepare_service([], []) opts.set_defaults(self.conf) content = ('{"default": ""}') if six.PY3: content = content.encode('utf-8') self.tempfile = fileutils.write_to_tempfile(content=content, prefix='policy', suffix='.json') conf.set_override("policy_file", self.tempfile, group='oslo_policy') conf.set_override( 'api_paste_config', os.path.abspath('etc/panko/api_paste.ini') ) parsed_url = list(urlparse.urlparse(db_url)) parsed_url[2] += '-%s' % uuidutils.generate_uuid(dashed=False) db_url = urlparse.urlunparse(parsed_url) conf.set_override('connection', db_url, group='database') if (parsed_url[0].startswith("mysql") or parsed_url[0].startswith("postgresql")): sqlalchemy_utils.create_database(conf.database.connection) self.conn = storage.get_connection_from_config(self.conf) self.conn.upgrade() LOAD_APP_KWARGS = { 'conf': conf, 'appname': 'panko+noauth', }
def url_for(self, resource='', query=None, params='', fragment='', keep_blank_values=None): """Generate url for resource Use endpoint for generation Ex. resource = 'one/two/three' result - http://localhost:5000/api/one/two/three/ if endpoint == http://localhost:5000/api/ :param resource: str :param query: dict for generate query string {a: 1, b: 2} -> ?a=1&b=2, or string :param params: params for last path url :param fragment: #fragment :return: str, url """ parsed_url = list(urlparse.urlparse(self.endpoint)) if resource: path = self.path + '/' + resource else: path = self.path if self.close_slash: if not path.endswith('/'): path += '/' if not params: params = self.params if not fragment: fragment = self.fragment parsed_url[2] = path parsed_url[3] = params parsed_url[5] = fragment if self.query: parsed_url[4] = urlencode(self.query, doseq=1) if query is not None: if keep_blank_values is None: keep_blank_values = self.keep_blank_values if isinstance(query, six.string_types): query = urlparse.parse_qs(query, keep_blank_values=keep_blank_values) req_query = dict(self.query) req_query.update(query) req_query = urlencode(req_query, doseq=1) parsed_url[4] = req_query url = urlparse.urlunparse(parsed_url) self.logger.debug('Url %s built for resource "%s"', url, resource) return url
def paginate(page=None, request=None, distance=2, edge=1, extra_class='', text_labels=True): paginator = page.paginator pages = calculate_pages( page.number, paginator.num_pages, distance=distance, edge=edge) prev_page_url = next_page_url = None result = [] if request: parts = urlparse(request.get_full_path()) params = parse_qs(parts.query) for page_num in pages: if not page_num: result.append((page_num, None)) else: params['page'] = [str(page_num)] query = urlencode(params, doseq=True) url = urlunparse(parts[:4] + (query,) + parts[5:]) result.append((page_num, url)) if page.has_previous(): params['page'] = page.previous_page_number() query = urlencode(params, doseq=True) prev_page_url = urlunparse(parts[:4] + (query,) + parts[5:]) if page.has_next(): params['page'] = page.next_page_number() query = urlencode(params, doseq=True) next_page_url = urlunparse(parts[:4] + (query,) + parts[5:]) else: for page_num in pages: url = '?%s' % urlencode({'page': str(page_num)}) result.append((page_num, url)) if page.has_previous(): prev_page_url = '?%s' % urlencode({ 'page': str(page.previous_page_number()) }) if page.has_next(): next_page_url = '?%s' % urlencode({ 'page': str(page.next_page_number()) }) pages = result return { 'current': page.number, 'page': page, 'pages': pages, 'paginator': paginator, 'next_page_url': next_page_url, 'prev_page_url': prev_page_url, 'extra_class': extra_class, 'text_labels': text_labels, }
def _list_pagination(self, url, response_key=None, obj_class=None, limit=None): """Retrieve a list of items. The Zun API is configured to return a maximum number of items per request, (FIXME: see Zun's api.max_limit option). This iterates over the 'next' link (pagination) in the responses, to get the number of items specified by 'limit'. If 'limit' is None this function will continue pagination until there are no more values to be returned. :param url: a partial URL, e.g. '/nodes' :param response_key: the key to be looked up in response dictionary, e.g. 'nodes' :param obj_class: class for constructing the returned objects. :param limit: maximum number of items to return. If None returns everything. """ if obj_class is None: obj_class = self.resource_class if limit is not None: limit = int(limit) object_list = [] object_count = 0 limit_reached = False while url: resp, body = self.api.json_request('GET', url) data = self._format_body_data(body, response_key) for obj in data: object_list.append(obj_class(self, obj, loaded=True)) object_count += 1 if limit and object_count >= limit: # break the for loop limit_reached = True break # break the while loop and return if limit_reached: break url = body.get('next') if url: # NOTE(lucasagomes): We need to edit the URL to remove # the scheme and netloc url_parts = list(urlparse.urlparse(url)) url_parts[0] = url_parts[1] = '' url = urlparse.urlunparse(url_parts) return object_list
def execute(self, http=None, num_retries=0): """Execute the request. Args: http: httplib2.Http, an http object to be used in place of the one the HttpRequest request object was constructed with. num_retries: Integer, number of times to retry with randomized exponential backoff. If all retries fail, the raised HttpError represents the last request. If zero (default), we attempt the request only once. Returns: A deserialized object model of the response body as determined by the postproc. Raises: googleapiclient.errors.HttpError if the response was not a 2xx. httplib2.HttpLib2Error if a transport error has occured. """ if http is None: http = self.http if self.resumable: body = None while body is None: _, body = self.next_chunk(http=http, num_retries=num_retries) return body # Non-resumable case. if 'content-length' not in self.headers: self.headers['content-length'] = str(self.body_size) # If the request URI is too long then turn it into a POST request. if len(self.uri) > MAX_URI_LENGTH and self.method == 'GET': self.method = 'POST' self.headers['x-http-method-override'] = 'GET' self.headers['content-type'] = 'application/x-www-form-urlencoded' parsed = urlparse(self.uri) self.uri = urlunparse( (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None, None) ) self.body = parsed.query self.headers['content-length'] = str(len(self.body)) # Handle retries for server-side errors. resp, content = _retry_request( http, num_retries, 'request', self._sleep, self._rand, str(self.uri), method=str(self.method), body=self.body, headers=self.headers) for callback in self.response_callbacks: callback(resp) if resp.status >= 300: raise HttpError(resp, content, uri=self.uri) return self.postproc(resp, content)
def temp_url(self, lifetime=300, method='GET', inline=True, filename=None): """Obtains a temporary URL to an object. Args: lifetime (int): The time (in seconds) the temporary URL will be valid method (str): The HTTP method that can be used on the temporary URL inline (bool, default True): If False, URL will have a Content-Disposition header that causes browser to download as attachment. filename (str, optional): A urlencoded filename to use for attachment, otherwise defaults to object name """ global_options = settings.get()['swift'] auth_url = global_options.get('auth_url') temp_url_key = global_options.get('temp_url_key') if not self.resource: raise ValueError('can only create temporary URL on object') if not temp_url_key: raise ValueError( 'a temporary url key must be set with settings.update ' 'or by setting the OS_TEMP_URL_KEY environment variable') if not auth_url: raise ValueError( 'an auth url must be set with settings.update ' 'or by setting the OS_AUTH_URL environment variable') obj_path = '/v1/%s' % self[len(self.drive):] # Generate the temp url using swifts helper. Note that this method is ONLY # useful for obtaining the temp_url_sig and the temp_url_expires parameters. # These parameters will be used to construct a properly-escaped temp url obj_url = generate_temp_url(obj_path, lifetime, temp_url_key, method) query_begin = obj_url.rfind('temp_url_sig', 0, len(obj_url)) obj_url_query = obj_url[query_begin:] obj_url_query = dict(parse.parse_qsl(obj_url_query)) query = ['temp_url_sig=%s' % obj_url_query['temp_url_sig'], 'temp_url_expires=%s' % obj_url_query['temp_url_expires']] if inline: query.append('inline') if filename: query.append('filename=%s' % parse.quote(filename)) auth_url_parts = parse.urlparse(auth_url) return parse.urlunparse((auth_url_parts.scheme, auth_url_parts.netloc, parse.quote(obj_path), auth_url_parts.params, '&'.join(query), auth_url_parts.fragment))
def _list_pagination(self, url, response_key=None, obj_class=None, limit=None): """Retrieve a list of items. The Iotronic API is configured to return a maximum number of items per request, (see Iotronic's api.max_limit option). This iterates over the 'next' link (pagination) in the responses, to get the number of items specified by 'limit'. If 'limit' is None this function will continue pagination until there are no more values to be returned. :param url: a partial URL, e.g. '/boards' :param response_key: the key to be looked up in response dictionary, e.g. 'boards' :param obj_class: class for constructing the returned objects. :param limit: maximum number of items to return. If None returns everything. """ if obj_class is None: obj_class = self.resource_class if limit is not None: limit = int(limit) object_list = [] object_count = 0 limit_reached = False while url: resp, body = self.api.json_request('GET', url) data = self._format_body_data(body, response_key) for obj in data: object_list.append(obj_class(self, obj, loaded=True)) object_count += 1 if limit and object_count >= limit: # break the for loop limit_reached = True break # break the while loop and return if limit_reached: break url = body.get('next') if url: # NOTE(lucasagomes): We need to edit the URL to remove # the scheme and netloc url_parts = list(urlparse.urlparse(url)) url_parts[0] = url_parts[1] = '' url = urlparse.urlunparse(url_parts) return object_list