Python six.moves.urllib.parse 模块,urlparse() 实例源码


def validate_config(self, organization, config, actor=None):
        if config.get('url'):
            client = self.get_client(actor)
            # parse out the repo name and the instance
            parts = urlparse(config['url'])
            instance = parts.netloc
            name = parts.path.rsplit('_git/', 1)[-1]
            project = config.get('project') or name

                repo = client.get_repo(instance, name, project)
            except Exception as e:
                self.raise_error(e, identity=client.auth)
                'instance': instance,
                'project': project,
                'name': repo['name'],
                'external_id': six.text_type(repo['id']),
                'url': repo['_links']['web']['href'],
        return config
def _connect_request(self, vdi_ref):
        # request connection to xapi url service for VDI export
            # create task for VDI export
            label = 'VDI_EXPORT_for_' + self.instance['name']
            desc = 'Exporting VDI for instance: %s' % self.instance['name']
            self.task_ref = self.session.task.create(label, desc)
            LOG.debug("task_ref is %s" % self.task_ref)
            # connect to XS
            xs_url = urlparse.urlparse(self.host_url)
            if xs_url.scheme == 'http':
                conn = httplib.HTTPConnection(xs_url.netloc)
                LOG.debug("using http")
            elif xs_url.scheme == 'https':
                conn = httplib.HTTPSConnection(xs_url.netloc)
                LOG.debug("using https")
            vdi_export_path = utils.get_vdi_export_path(
                self.session, self.task_ref, vdi_ref)
            conn.request('GET', vdi_export_path)
            conn_resp = conn.getresponse()
        except Exception:
            LOG.debug('request connect for vdi export failed')
        return conn_resp
def make_temp_file_path_from_url(temp_dir_path, url):
        url_path = urlparse(url).path
    except AttributeError:
        raise InvalidFilePathError("url must be a string")

    if typepy.is_null_string(url_path):
        raise InvalidFilePathError("invalid URL path: {}".format(url_path))

    temp_name = os.path.basename(url_path.rstrip("/"))
    if typepy.is_null_string(temp_name):
        temp_name = pathvalidate.replace_symbol(
            temp_name, replacement_text="_")

    if typepy.is_null_string(temp_name):
        raise InvalidFilePathError("invalid URL: {}".format(url))

        return posixpath.join(temp_dir_path, temp_name)
    except (TypeError, AttributeError):
        raise InvalidFilePathError("temp_dir_path must be a string")
def get_filename(self, task, default_ext):
        """Set the path where the image will be saved.

        The default strategy is to use an increasing 6-digit number as
        the filename. You can override this method if you want to set custom
        naming rules. The file extension is kept if it can be obtained from
        the url, otherwise ``default_ext`` is used as extension.

            task (dict): The task dict got from ``task_queue``.

            Filename with extension.
        url_path = urlparse(task['file_url'])[2]
        extension = url_path.split('.')[-1] if '.' in url_path else default_ext
        file_idx = self.fetched_num + self.file_idx_offset
        return '{:06d}.{}'.format(file_idx, extension)
def do_GET(self):
        self.send_header('Content-Type', 'text/plain')

        parts = urlparse(self.path)
        query = parse_qs(parts.query)

        result = {}
        for k, v in query.items():
            if len(v) == 1:
                result[k] = v[0]
            elif len(v) == 0:
                result[k] = None
                result[k] = v

        self.server.callback_result = result

        self.wfile.write(b'You can close this window now')
def sort_url_by_query_keys(url):
    """A helper function which sorts the keys of the query string of a url.

       For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
       returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
       prevent non-deterministic ordering of the query string causing
       problems with unit tests.
    :param url: url which will be ordered by query keys
    :returns url: url with ordered query keys
    parsed = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed.query, True)
    sorted_query = sorted(queries, key=lambda x: x[0])

    encoded_sorted_query = urlparse.urlencode(sorted_query, True)

    url_parts = (parsed.scheme, parsed.netloc, parsed.path,
                 parsed.params, encoded_sorted_query,

    return urlparse.urlunparse(url_parts)
def strip_version(endpoint):
    """Strip version from the last component of endpoint if present."""
    # NOTE(flaper87): This shouldn't be necessary if
    # we make endpoint the first argument. However, we
    # can't do that just yet because we need to keep
    # backwards compatibility.
    if not isinstance(endpoint, six.string_types):
        raise ValueError("Expected endpoint")

    version = None
    # Get rid of trailing '/' if present
    endpoint = endpoint.rstrip('/')
    url_parts = parse.urlparse(endpoint)
    (scheme, netloc, path, __, __, __) = url_parts
    path = path.lstrip('/')
    # regex to match 'v1' or 'v2.0' etc
    if re.match('v\d+\.?\d*', path):
        version = float(path.lstrip('v'))
        endpoint = scheme + '://' + netloc
    return endpoint, version
def validate_link(self, link, bookmark=False):
        """Checks if the given link can get correct data."""
        # removes the scheme and net location parts of the link
        url_parts = list(urlparse.urlparse(link))
        url_parts[0] = url_parts[1] = ''

        # bookmark link should not have the version in the URL
        if bookmark and url_parts[2].startswith(PATH_PREFIX):
            return False

        full_path = urlparse.urlunparse(url_parts)
            self.get_json(full_path, path_prefix='')
            return True
        except Exception:
            return False
def get_contents_if_file(contents_or_file_name):
    """Get the contents of a file.

    If the value passed in is a file name or file URI, return the
    contents. If not, or there is an error reading the file contents,
    return the value passed in as the contents.

    For example, a workflow definition will be returned if either the
    workflow definition file name, or file URI are passed in, or the
    actual workflow definition itself is passed in.
        if parse.urlparse(contents_or_file_name).scheme:
            definition_url = contents_or_file_name
            path = os.path.abspath(contents_or_file_name)
            definition_url = parse.urljoin(
        return request.urlopen(definition_url).read().decode('utf8')
    except Exception:
        return contents_or_file_name
def __init__(self, requirement_string):
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8])) =
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
def __init__(self, url, protocols=None, agent=None):
        self.url = url
        self.protocols = protocols or []
        self.agent = agent or constants.USER_AGENT

        self._headers = []

        _url = urlparse(url)
        self.scheme = _url.scheme
        host, _, port = _url.netloc.partition(':')
        if not port:
            port = '443' if self.scheme == 'wss' else '80'
        if not port.isdigit():
            raise ValueError('illegal port value')
        port = int(port) = host
        self.port = port
        self._host_port = "{}:{}".format(host, port)
        self.resource = _url.path or '/'
        if _url.query:
            self.resource = "{}?{}".format(self.resource, _url.query)

        self.state = self.State()
def test_redirects_without_credentials(self):
        request = self.factory.get('/test')
        request.session = self.session

        def test_view(request):
            return http.HttpResponse('test')  # pragma: NO COVER

        response = test_view(request)
        self.assertIsInstance(response, http.HttpResponseRedirect)
            'return_url=%2Ftest', parse.urlparse(response['Location']).query)

def __init__(self, requirement_string):
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8])) =
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
def extract_image_url(text):
    text = _strip_url(text)
    imgurl = None
    if text:
        # check if the text is style content
        match =
        text = match.groups()[0] if match else text
        parsed = urlparse(text)
        path = None
        match =
        if match:
            path =
        elif parsed.query:
            match =
            if match:
                path =
        if path is not None:
            parsed = list(parsed)
            parsed[2] = path
            imgurl = urlunparse(parsed)
        if not imgurl:
            imgurl = text
    return imgurl
def _fix_esx_url(url, host, port):
        """Fix netloc in the case of an ESX host.

        In the case of an ESX host, the netloc is set to '*' in the URL
        returned in HttpNfcLeaseInfo. It should be replaced with host name
        or IP address.
        urlp = urlparse.urlparse(url)
        if urlp.netloc == '*':
            scheme, netloc, path, params, query, fragment = urlp
            if netutils.is_valid_ipv6(host):
                netloc = '[%s]:%d' % (host, port)
                netloc = "%s:%d" % (host, port)
            url = urlparse.urlunparse((scheme,
        return url
def __init__(self, requirement_string):
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8])) =
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
def _decorate_request(self, filters, method, url, headers=None, body=None,
        if auth_data is None:
            auth_data = self.auth_data
        token, _ = auth_data
        #base_url = self.base_url(filters=filters, auth_data=auth_data)
        base_url = url
        # build authenticated request
        # returns new request, it does not touch the original values
        _headers = copy.deepcopy(headers) if headers is not None else {}
        _headers['X-Auth-Token'] = str(token)
        if url is None or url == "":
            _url = base_url
            # Join base URL and url, and remove multiple contiguous slashes
            _url = "/".join([base_url, url])
            parts = [x for x in urlparse.urlparse(_url)]
            parts[2] = re.sub("/{2,}", "/", parts[2])
            _url = urlparse.urlunparse(parts)
        # no change to method or body
        return str(_url), _headers, body
def visit_ls(self, node, children):
        path = urlparse(self.context_override.url).path
        path = filter(None, path.split('/'))
        nodes =*path)
        if self.output.isatty():
            names = []
            for node in nodes:
                token_type = String if'type') == 'dir' else Name
                name = self._colorize(, token_type)
            lines = list(colformat(list(names)))
            lines = [ for n in nodes]
        if lines:
        return node
def _drop_db(self):
        addr = urlparse.urlparse(self.url)
        database = addr.path.strip('/')
        loc_pieces = addr.netloc.split('@')
        host = loc_pieces[1]
        auth_pieces = loc_pieces[0].split(':')
        user = auth_pieces[0]
        password = ""
        if len(auth_pieces) > 1:
            if auth_pieces[1].strip():
                password = "-p\"%s\"" % auth_pieces[1]
        sql = ("drop database if exists %(database)s; create "
               "database %(database)s;") % {'database': database}
        cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s "
               "-e \"%(sql)s\"") % {'user': user, 'password': password,
                                    'host': host, 'sql': sql}
def get_connection_params(endpoint, **kwargs):
        parts = urlparse.urlparse(endpoint)

        # trim API version and trailing slash from endpoint
        path = parts.path
        path = path.rstrip('/').rstrip(API_VERSION)

        _args = (parts.hostname, parts.port, path)
        _kwargs = {'timeout': (float(kwargs.get('timeout'))
                               if kwargs.get('timeout') else 600)}

        if parts.scheme == 'https':
            _class = VerifiedHTTPSConnection
            _kwargs['ca_file'] = kwargs.get('ca_file', None)
            _kwargs['cert_file'] = kwargs.get('cert_file', None)
            _kwargs['key_file'] = kwargs.get('key_file', None)
            _kwargs['insecure'] = kwargs.get('insecure', False)
        elif parts.scheme == 'http':
            _class = six.moves.http_client.HTTPConnection
            msg = 'Unsupported scheme: %s' % parts.scheme
            raise exceptions.EndpointException(msg)

        return (_class, _args, _kwargs)
def test_redirects_without_credentials(self):
        request = self.factory.get('/test')
        request.session = self.session

        def test_view(request):
            return http.HttpResponse('test')  # pragma: NO COVER

        response = test_view(request)
        self.assertIsInstance(response, http.HttpResponseRedirect)
            'return_url=%2Ftest', parse.urlparse(response['Location']).query)

def test_absolute_url(self):
        Does the server return absolute URIs?

        Relies on the root returing a ``self`` object.

        response = requests.get(self.root_url)

        resource = response.json().get('href')
        assert resource is not None

        href = urlparse(resource)
        # Test in decreasing order of likely correctness:
        # Path => Domain => Scheme
        assert href.path == '/'
        # Get the origin name, minus the scheme.
        assert href.netloc == urlparse(self.origin).netloc
        assert href.scheme == 'http'

        resource = response.json().get('buckets').get('1.0')
        href = urlparse(resource)
        assert href.path.endswith('buckets/1.0')
def add_capability(self, name, uri):
        Add a capability of the RETS board
        :param name: The name of the capability
        :param uri: The capability URI given by the RETS board
        :return: None

        parse_results = urlparse(uri)
        if parse_results.hostname is None:
            # relative URL given, so build this into an absolute URL
            login_url = self.capabilities.get('Login')
            if not login_url:
                logger.error("There is no login URL stored, so additional capabilities cannot be added.")
                raise ValueError("Cannot automatically determine absolute path for {0!s} given.".format(uri))

            parts = urlparse(login_url)
            uri = parts.scheme + '://' + parts.hostname + '/' + uri.lstrip('/')

        self.capabilities[name] = uri
def __init__(self, requirement_string):
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8])) =
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
def _get_proxy_config(self):
        Determine if self._url should be passed through a proxy. If so, return
        the appropriate proxy_server and proxy_port. Assumes https_proxy is used
        when ssl_enabled=True.


        proxy_server = None
        proxy_port = None
        ssl_enabled = self._ssl_enabled

        if ssl_enabled:
            proxy = os.environ.get("https_proxy")
            proxy = os.environ.get("http_proxy")
        no_proxy = os.environ.get("no_proxy")
        no_proxy_url = no_proxy and self._server in no_proxy

        if proxy and not no_proxy_url:
            p = urlparse(proxy)
            proxy_server = p.hostname
            proxy_port = p.port

        return proxy_server, proxy_port
def __init__(self, base_url='', **layout_kwargs):
        self.base_url = base_url
        self.selected_url = None
        if 'min_width' not in layout_kwargs:
            layout_kwargs['min_width'] = '30%'
        self.label_layout = Layout(**layout_kwargs)

        dd = widgets.Select(
            description='', #urlparse(base_url).path,

        dd.observe(partial(self.on_value_change, url=self.base_url), names='value')
        lbl = widgets.Label(urlparse(self.base_url).path, layout=self.label_layout)
        hbox = widgets.HBox([lbl, dd])
        self.elts = [hbox, lbl, dd]
def on_value_change(self, change, url):
        next_url = change['new']
        if next_url is None: # 'Select...' chosen
        if next_url.endswith('.grb'): # File reached
            return self.select_url(next_url)
        [w.close() for w in self.elts]
        links = self.get_links(next_url,
                                            if next_url == self.base_url
                                            else self.dir_or_grib))
        if not links:
        next_dd = widgets.Select(
            description='', #urlparse(url).path,
        next_dd.observe(partial(self.on_value_change, url=next_url), names='value')
        lbl = widgets.Label(urlparse(next_url).path, layout=self.label_layout)
        hbox = widgets.HBox([lbl, next_dd])
        self.elts = [hbox, lbl, next_dd]
def __init__(self, requirement_string):
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8])) =
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
def __init__(self, requirement_string):
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8])) =
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
def validate_href(self, image_href):
        path = urlparse.urlparse(image_href).path.lstrip('/')
        if not path:
            raise exception.ImageRefValidationFailed(
                _("No path specified in swift resource reference: %s. "
                  "Reference must be like swift:container/path")
                % str(image_href))

        container, s, object = path.partition('/')
            headers = self.client.head_object(container, object)
        except exception.SwiftOperationError as e:
            raise exception.ImageRefValidationFailed(
                _("Cannot fetch %(url)s resource. %(exc)s") %
                dict(url=str(image_href), exc=str(e)))

        return (container, object, headers)
def validate_href(self, image_href):
        path = urlparse.urlparse(image_href).path.lstrip('/')
        if not path:
            raise exception.InvalidParameterValue(
                _("No path specified in rsync resource reference: %s. "
                  "Reference must be like rsync:host::module/path")
                % str(image_href))
            stdout, stderr = utils.execute(
                'rsync', '--stats', '--dry-run',
            return path, stdout, stderr

        except (processutils.ProcessExecutionError, OSError) as ex:
            raise exception.ImageRefValidationFailed(
                _("Cannot fetch %(url)s resource. %(exc)s") %
                dict(url=str(image_href), exc=str(ex)))
def get_glance_image_uuid_name(task, url):
    """Converting glance links.

    Links like:
    are converted to tuple
        uuid, name

    urlobj = urlparse.urlparse(url)
    if urlobj.scheme and urlobj.scheme != 'glance':
        raise exception.InvalidImageRef("Only glance images are supported.")

    path = urlobj.path or urlobj.netloc
    img_info = _get_glanceclient(task.context).show(path)
    return img_info['id'], img_info['name']
def url_download_raw_secured(context, node, url):
    """Download raw contents of the URL bypassing cache and temp files."""
    if not url:
    url = url.rstrip('/')
    url = append_storage_prefix(node, url)

    scheme = parse.urlparse(url).scheme
    sources_with_tenant_isolation = ('glance', 'swift')
    if scheme not in sources_with_tenant_isolation:
        raise bareon_exception.UnsafeUrlError(
            details="Use one of the following storages "
                    "for this resource: %s"
                    % str(sources_with_tenant_isolation))

    resource_storage = image_service.get_image_service(
        url, context=context)
    out = StringIO.StringIO()
    try:, out)
        return out.getvalue()
def __init__(self, **kw):
        super(FollowAllSpider, self).__init__(**kw)
        url = 'http://localhost/'
        if not url.startswith('http://') and not url.startswith('https://'):
            url = 'http://%s/' % url
        self.url = url
        self.allowed_domains = [re.sub(r'^www\.', '', urlparse(url).hostname)]
        self.link_extractor = LinkExtractor()
        self.cookies_seen = set()
        self.previtem = 0
        self.items = 0
        self.timesec = datetime.datetime.utcnow()
def __init__(self, requirement_string):
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8])) =
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
def params(arg):
    Parse `arg` and returns a dict describing the TPDA

    Resolution order:
        - `arg` is a string is parsed as an URL
        - `arg` has an items() method and is parsed
           (works for multidicts, ..)
        - `arg` is a iterable is parsed as a
           ((name, value), (name, value), ..)
    if isinstance(arg, str):
        pr = parse.urlparse(arg)
        if not pr.query:
            raise exceptions.IncompleteURI
        return Params(parse.parse_qsl(pr.query)).params()
    elif hasattr(arg, 'items'):
        return Params(arg.items()).params()
    elif hasattr(arg, '__iter__'):
        return Params(arg).params()
def media_for(hash_key, scheme=None, style=None):
    if not hash_key:
        return None
    url = hash_key.split('!')[0]
    up = urlparse(url)
    hash_domain = up.hostname
    if hash_domain and hash_domain not in Config.QINIU_DOMAINS:
        if hash_domain == '':
            hash_key = hash_key.replace('http://', 'https://')
        return hash_key
    _hash = up.path
    if len(_hash) != 0 and _hash[0] == '/':
        _hash = _hash[1:]

    media_host = Config.QINIU_HOST
    url = '%s/%s' % (media_host, _hash)
    if url.endswith('.amr'):
        url = '%s.mp3' % url[:-4]
    if url and style:
        url = '%s!%s' % (url, style)
    return url
def create_temp_url(swift_client, name, timeout, container=None):

    container = container or '%(name)s-%(uuid)s' % {
        'name': name, 'uuid': uuid.uuid4()}
    object_name = str(uuid.uuid4())

    key_header = 'x-account-meta-temp-url-key'
    if key_header not in swift_client.head_account():
            key_header: six.text_type(uuid.uuid4())[:32]})

    key = swift_client.head_account()[key_header]
    project_path = swift_client.url.split('/')[-1]
    path = '/v1/%s/%s/%s' % (project_path, container, object_name)
    timeout_secs = timeout * 60
    tempurl = swiftclient_utils.generate_temp_url(path, timeout_secs, key,
    sw_url = urlparse.urlparse(swift_client.url)
    put_url = '%s://%s%s' % (sw_url.scheme, sw_url.netloc, tempurl)
    swift_client.put_object(container, object_name, '')
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
            except KeyError:
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
            except KeyError:
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
            except KeyError:
def _get_node_ip_heartbeat(task):
    callback_url = task.node.driver_internal_info.get('agent_url', '')
    return urlparse.urlparse(callback_url).netloc.split(':')[0]
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def _parse_root_device_hints(node):
    """Convert string with hints to dict. """
    root_device ='root_device')
    if not root_device:
        return {}
        parsed_hints = irlib_utils.parse_root_device_hints(root_device)
    except ValueError as e:
        raise exception.InvalidParameterValue(
            _('Failed to validate the root device hints for node %(node)s. '
              'Error: %(error)s') % {'node': node.uuid, 'error': e})
    root_device_hints = {}
    advanced = {}
    for hint, value in parsed_hints.items():
        if isinstance(value, six.string_types):
            if value.startswith('== '):
                root_device_hints[hint] = int(value[3:])
            elif value.startswith('s== '):
                root_device_hints[hint] = urlparse.unquote(value[4:])
                advanced[hint] = value
            root_device_hints[hint] = value
    if advanced:
        raise exception.InvalidParameterValue(
            _('Ansible-deploy does not support advanced root device hints '
              'based on oslo.utils operators. '
              'Present advanced hints for node %(node)s are %(hints)s.') % {
                  'node': node.uuid, 'hints': advanced})
    return root_device_hints
def _prepare_variables(task):
    node = task.node
    i_info = node.instance_info
    image = {}
    for i_key, i_value in i_info.items():
        if i_key.startswith('image_'):
            image[i_key[6:]] = i_value
    image['mem_req'] = _calculate_memory_req(task)

    checksum = image.get('checksum')
    if checksum:
        # NOTE(pas-ha) checksum can be in <algo>:<checksum> format
        # as supported by various Ansible modules, mostly good for
        # standalone Ironic case when instance_info is populated manually.
        # With no <algo> we take that instance_info is populated from Glance,
        # where API reports checksum as MD5 always.
        if ':' not in checksum:
            image['checksum'] = 'md5:%s' % checksum
    variables = {'image': image}
    configdrive = i_info.get('configdrive')
    if configdrive:
        if urlparse.urlparse(configdrive).scheme in ('http', 'https'):
            cfgdrv_type = 'url'
            cfgdrv_location = configdrive
            cfgdrv_location = _get_configdrive_path(node.uuid)
            with open(cfgdrv_location, 'w') as f:
            cfgdrv_type = 'file'
        variables['configdrive'] = {'type': cfgdrv_type,
                                    'location': cfgdrv_location}

    root_device_hints = _parse_root_device_hints(node)
    if root_device_hints:
        variables['root_device_hints'] = root_device_hints

    return variables
def get_addon_host_ident():
    ident = urlparse(options.get('system.url-prefix')).hostname
    if ident in ('localhost', '', None, ''):
        return ''
    return ident
def base_url(url):
    result = urlparse(url)
    return '%s://%s' % (result.scheme, result.netloc)
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def from_request(cls, request):
        host = urlparse(request.url).netloc
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def get_decorator(placebo):
    url = placebo._get_url()

    if isinstance(url, six.string_types):
        url = parse.urlparse(url)

    # TODO should we remove empty parts of url?
    match_kwargs = {
        'scheme': url.scheme,
        'netloc': url.netloc,
        'path': url.path,
        'method': placebo._get_method(),
        'query': url.query

    match_kwargs = {k: v
                    for k, v in match_kwargs.items()
                    if v}

    def mock_response(url, request):
        # Convert parse result type from SplitResult to ParseResult
        url = parse.urlparse(url.geturl())
        # if body is empty httmock returns None
        # but we want ot to be always string.
        body = request.body or ''
        headers = request.headers
        return {'status_code': placebo._get_status(url, headers, body),
                'content': placebo._get_body(url, headers, body),
                'headers': placebo._get_headers(url, headers, body)}

    return httmock.with_httmock(mock_response)
def __init__(self, url, headers, body):
        if isinstance(url, (parse.ParseResult, parse.SplitResult)):
            self.url = url.geturl()
            self.parsed_url = url
        elif isinstance(url, six.string_types):
            self.url = url
            self.parsed_url = parse.urlparse(url)

        self.headers = headers
        self.body = body
def get_decorator(placebo):
    url = placebo._get_url()

    if isinstance(url, six.string_types):
        url = parse.urlparse(url)

    # TODO should we remove empty parts of url?
    match_kwargs = {
        'scheme': url.scheme,
        'netloc': url.netloc,
        'path': url.path,
        'method': placebo._get_method(),
        'query': url.query

    match_kwargs = {k: v
                    for k, v in match_kwargs.items()
                    if v}

    def mock_response(url, request):
        # Convert parse result type from SplitResult to ParseResult
        url = parse.urlparse(url.geturl())
        # if body is empty httmock returns None
        # but we want ot to be always string.
        body = request.body or ''
        headers = request.headers
        return {'status_code': placebo._get_status(url, headers, body),
                'content': placebo._get_body(url, headers, body),
                'headers': placebo._get_headers(url, headers, body)}

    return httmock.with_httmock(mock_response)