Python six.moves.urllib.parse 模块,urlparse() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.urllib.parse.urlparse()

项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def validate_config(self, organization, config, actor=None):
        if config.get('url'):
            client = self.get_client(actor)
            # parse out the repo name and the instance
            parts = urlparse(config['url'])
            instance = parts.netloc
            name = parts.path.rsplit('_git/', 1)[-1]
            project = config.get('project') or name

            try:
                repo = client.get_repo(instance, name, project)
            except Exception as e:
                self.raise_error(e, identity=client.auth)
            config.update({
                'instance': instance,
                'project': project,
                'name': repo['name'],
                'external_id': six.text_type(repo['id']),
                'url': repo['_links']['web']['href'],
            })
        return config
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def _connect_request(self, vdi_ref):
        # request connection to xapi url service for VDI export
        try:
            # create task for VDI export
            label = 'VDI_EXPORT_for_' + self.instance['name']
            desc = 'Exporting VDI for instance: %s' % self.instance['name']
            self.task_ref = self.session.task.create(label, desc)
            LOG.debug("task_ref is %s" % self.task_ref)
            # connect to XS
            xs_url = urlparse.urlparse(self.host_url)
            if xs_url.scheme == 'http':
                conn = httplib.HTTPConnection(xs_url.netloc)
                LOG.debug("using http")
            elif xs_url.scheme == 'https':
                conn = httplib.HTTPSConnection(xs_url.netloc)
                LOG.debug("using https")
            vdi_export_path = utils.get_vdi_export_path(
                self.session, self.task_ref, vdi_ref)
            conn.request('GET', vdi_export_path)
            conn_resp = conn.getresponse()
        except Exception:
            LOG.debug('request connect for vdi export failed')
            raise
        return conn_resp
项目:pytablereader    作者:thombashi    | 项目源码 | 文件源码
def make_temp_file_path_from_url(temp_dir_path, url):
    try:
        url_path = urlparse(url).path
    except AttributeError:
        raise InvalidFilePathError("url must be a string")

    if typepy.is_null_string(url_path):
        raise InvalidFilePathError("invalid URL path: {}".format(url_path))

    temp_name = os.path.basename(url_path.rstrip("/"))
    if typepy.is_null_string(temp_name):
        temp_name = pathvalidate.replace_symbol(
            temp_name, replacement_text="_")

    if typepy.is_null_string(temp_name):
        raise InvalidFilePathError("invalid URL: {}".format(url))

    try:
        return posixpath.join(temp_dir_path, temp_name)
    except (TypeError, AttributeError):
        raise InvalidFilePathError("temp_dir_path must be a string")
项目:icrawler    作者:hellock    | 项目源码 | 文件源码
def get_filename(self, task, default_ext):
        """Set the path where the image will be saved.

        The default strategy is to use an increasing 6-digit number as
        the filename. You can override this method if you want to set custom
        naming rules. The file extension is kept if it can be obtained from
        the url, otherwise ``default_ext`` is used as extension.

        Args:
            task (dict): The task dict got from ``task_queue``.

        Output:
            Filename with extension.
        """
        url_path = urlparse(task['file_url'])[2]
        extension = url_path.split('.')[-1] if '.' in url_path else default_ext
        file_idx = self.fetched_num + self.file_idx_offset
        return '{:06d}.{}'.format(file_idx, extension)
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def do_GET(self):
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain')
        self.end_headers()

        parts = urlparse(self.path)
        query = parse_qs(parts.query)

        result = {}
        for k, v in query.items():
            if len(v) == 1:
                result[k] = v[0]
            elif len(v) == 0:
                result[k] = None
            else:
                result[k] = v

        self.server.callback_result = result

        self.wfile.write(b'You can close this window now')
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def sort_url_by_query_keys(url):
    """A helper function which sorts the keys of the query string of a url.

       For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
       returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
       prevent non-deterministic ordering of the query string causing
       problems with unit tests.
    :param url: url which will be ordered by query keys
    :returns url: url with ordered query keys
    """
    parsed = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed.query, True)
    sorted_query = sorted(queries, key=lambda x: x[0])

    encoded_sorted_query = urlparse.urlencode(sorted_query, True)

    url_parts = (parsed.scheme, parsed.netloc, parsed.path,
                 parsed.params, encoded_sorted_query,
                 parsed.fragment)

    return urlparse.urlunparse(url_parts)
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def strip_version(endpoint):
    """Strip version from the last component of endpoint if present."""
    # NOTE(flaper87): This shouldn't be necessary if
    # we make endpoint the first argument. However, we
    # can't do that just yet because we need to keep
    # backwards compatibility.
    if not isinstance(endpoint, six.string_types):
        raise ValueError("Expected endpoint")

    version = None
    # Get rid of trailing '/' if present
    endpoint = endpoint.rstrip('/')
    url_parts = parse.urlparse(endpoint)
    (scheme, netloc, path, __, __, __) = url_parts
    path = path.lstrip('/')
    # regex to match 'v1' or 'v2.0' etc
    if re.match('v\d+\.?\d*', path):
        version = float(path.lstrip('v'))
        endpoint = scheme + '://' + netloc
    return endpoint, version
项目:zun    作者:openstack    | 项目源码 | 文件源码
def validate_link(self, link, bookmark=False):
        """Checks if the given link can get correct data."""
        # removes the scheme and net location parts of the link
        url_parts = list(urlparse.urlparse(link))
        url_parts[0] = url_parts[1] = ''

        # bookmark link should not have the version in the URL
        if bookmark and url_parts[2].startswith(PATH_PREFIX):
            return False

        full_path = urlparse.urlunparse(url_parts)
        try:
            self.get_json(full_path, path_prefix='')
            return True
        except Exception:
            return False
项目:python-kingbirdclient    作者:openstack    | 项目源码 | 文件源码
def get_contents_if_file(contents_or_file_name):
    """Get the contents of a file.

    If the value passed in is a file name or file URI, return the
    contents. If not, or there is an error reading the file contents,
    return the value passed in as the contents.

    For example, a workflow definition will be returned if either the
    workflow definition file name, or file URI are passed in, or the
    actual workflow definition itself is passed in.
    """
    try:
        if parse.urlparse(contents_or_file_name).scheme:
            definition_url = contents_or_file_name
        else:
            path = os.path.abspath(contents_or_file_name)
            definition_url = parse.urljoin(
                'file:',
                request.pathname2url(path)
            )
        return request.urlopen(definition_url).read().decode('utf8')
    except Exception:
        return contents_or_file_name
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
项目:dataplicity-lomond    作者:wildfoundry    | 项目源码 | 文件源码
def __init__(self, url, protocols=None, agent=None):
        self.url = url
        self.protocols = protocols or []
        self.agent = agent or constants.USER_AGENT

        self._headers = []

        _url = urlparse(url)
        self.scheme = _url.scheme
        host, _, port = _url.netloc.partition(':')
        if not port:
            port = '443' if self.scheme == 'wss' else '80'
        if not port.isdigit():
            raise ValueError('illegal port value')
        port = int(port)
        self.host = host
        self.port = port
        self._host_port = "{}:{}".format(host, port)
        self.resource = _url.path or '/'
        if _url.query:
            self.resource = "{}?{}".format(self.resource, _url.query)

        self.state = self.State()
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def test_redirects_without_credentials(self):
        request = self.factory.get('/test')
        request.session = self.session

        @decorators.oauth_required
        def test_view(request):
            return http.HttpResponse('test')  # pragma: NO COVER

        response = test_view(request)
        self.assertIsInstance(response, http.HttpResponseRedirect)
        self.assertEqual(parse.urlparse(response['Location']).path,
                         '/oauth2/oauth2authorize/')
        self.assertIn(
            'return_url=%2Ftest', parse.urlparse(response['Location']).query)

        self.assertEqual(response.status_code,
                         http.HttpResponseRedirect.status_code)
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
项目:portia2code    作者:scrapinghub    | 项目源码 | 文件源码
def extract_image_url(text):
    text = _strip_url(text)
    imgurl = None
    if text:
        # check if the text is style content
        match = _CSS_IMAGERE.search(text)
        text = match.groups()[0] if match else text
        parsed = urlparse(text)
        path = None
        match = _IMAGE_PATH_RE.search(parsed.path)
        if match:
            path = match.group()
        elif parsed.query:
            match = _GENERIC_PATH_RE.search(parsed.path)
            if match:
                path = match.group()
        if path is not None:
            parsed = list(parsed)
            parsed[2] = path
            imgurl = urlunparse(parsed)
        if not imgurl:
            imgurl = text
    return imgurl
项目:deb-oslo.vmware    作者:openstack    | 项目源码 | 文件源码
def _fix_esx_url(url, host, port):
        """Fix netloc in the case of an ESX host.

        In the case of an ESX host, the netloc is set to '*' in the URL
        returned in HttpNfcLeaseInfo. It should be replaced with host name
        or IP address.
        """
        urlp = urlparse.urlparse(url)
        if urlp.netloc == '*':
            scheme, netloc, path, params, query, fragment = urlp
            if netutils.is_valid_ipv6(host):
                netloc = '[%s]:%d' % (host, port)
            else:
                netloc = "%s:%d" % (host, port)
            url = urlparse.urlunparse((scheme,
                                       netloc,
                                       path,
                                       params,
                                       query,
                                       fragment))
        return url
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
项目:myautotest    作者:auuppp    | 项目源码 | 文件源码
def _decorate_request(self, filters, method, url, headers=None, body=None,
                          auth_data=None):
        if auth_data is None:
            auth_data = self.auth_data
        token, _ = auth_data
        #base_url = self.base_url(filters=filters, auth_data=auth_data)
        base_url = url
        # build authenticated request
        # returns new request, it does not touch the original values
        _headers = copy.deepcopy(headers) if headers is not None else {}
        _headers['X-Auth-Token'] = str(token)
        if url is None or url == "":
            _url = base_url
        else:
            # Join base URL and url, and remove multiple contiguous slashes
            _url = "/".join([base_url, url])
            parts = [x for x in urlparse.urlparse(_url)]
            parts[2] = re.sub("/{2,}", "/", parts[2])
            _url = urlparse.urlunparse(parts)
        # no change to method or body
        return str(_url), _headers, body
项目:http-prompt    作者:eliangcs    | 项目源码 | 文件源码
def visit_ls(self, node, children):
        path = urlparse(self.context_override.url).path
        path = filter(None, path.split('/'))
        nodes = self.context.root.ls(*path)
        if self.output.isatty():
            names = []
            for node in nodes:
                token_type = String if node.data.get('type') == 'dir' else Name
                name = self._colorize(node.name, token_type)
                names.append(name)
            lines = list(colformat(list(names)))
        else:
            lines = [n.name for n in nodes]
        if lines:
            self.output.write('\n'.join(lines))
        return node
项目:coverage2sql    作者:openstack    | 项目源码 | 文件源码
def _drop_db(self):
        addr = urlparse.urlparse(self.url)
        database = addr.path.strip('/')
        loc_pieces = addr.netloc.split('@')
        host = loc_pieces[1]
        auth_pieces = loc_pieces[0].split(':')
        user = auth_pieces[0]
        password = ""
        if len(auth_pieces) > 1:
            if auth_pieces[1].strip():
                password = "-p\"%s\"" % auth_pieces[1]
        sql = ("drop database if exists %(database)s; create "
               "database %(database)s;") % {'database': database}
        cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s "
               "-e \"%(sql)s\"") % {'user': user, 'password': password,
                                    'host': host, 'sql': sql}
        execute_cmd(cmd)
项目:python-zunclient    作者:openstack    | 项目源码 | 文件源码
def get_connection_params(endpoint, **kwargs):
        parts = urlparse.urlparse(endpoint)

        # trim API version and trailing slash from endpoint
        path = parts.path
        path = path.rstrip('/').rstrip(API_VERSION)

        _args = (parts.hostname, parts.port, path)
        _kwargs = {'timeout': (float(kwargs.get('timeout'))
                               if kwargs.get('timeout') else 600)}

        if parts.scheme == 'https':
            _class = VerifiedHTTPSConnection
            _kwargs['ca_file'] = kwargs.get('ca_file', None)
            _kwargs['cert_file'] = kwargs.get('cert_file', None)
            _kwargs['key_file'] = kwargs.get('key_file', None)
            _kwargs['insecure'] = kwargs.get('insecure', False)
        elif parts.scheme == 'http':
            _class = six.moves.http_client.HTTPConnection
        else:
            msg = 'Unsupported scheme: %s' % parts.scheme
            raise exceptions.EndpointException(msg)

        return (_class, _args, _kwargs)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def test_redirects_without_credentials(self):
        request = self.factory.get('/test')
        request.session = self.session

        @decorators.oauth_required
        def test_view(request):
            return http.HttpResponse('test')  # pragma: NO COVER

        response = test_view(request)
        self.assertIsInstance(response, http.HttpResponseRedirect)
        self.assertEqual(parse.urlparse(response['Location']).path,
                         '/oauth2/oauth2authorize/')
        self.assertIn(
            'return_url=%2Ftest', parse.urlparse(response['Location']).query)

        self.assertEqual(response.status_code,
                         http.HttpResponseRedirect.status_code)
项目:partycrasher    作者:naturalness    | 项目源码 | 文件源码
def test_absolute_url(self):
        """
        Does the server return absolute URIs?

        Relies on the root returing a ``self`` object.
        """

        response = requests.get(self.root_url)

        resource = response.json().get('href')
        assert resource is not None

        href = urlparse(resource)
        # Test in decreasing order of likely correctness:
        # Path => Domain => Scheme
        assert href.path == '/'
        # Get the origin name, minus the scheme.
        assert href.netloc == urlparse(self.origin).netloc
        assert href.scheme == 'http'

        resource = response.json().get('buckets').get('1.0')
        href = urlparse(resource)
        assert href.path.endswith('buckets/1.0')
项目:rets    作者:refindlyllc    | 项目源码 | 文件源码
def add_capability(self, name, uri):
        """
        Add a capability of the RETS board
        :param name: The name of the capability
        :param uri: The capability URI given by the RETS board
        :return: None
        """

        parse_results = urlparse(uri)
        if parse_results.hostname is None:
            # relative URL given, so build this into an absolute URL
            login_url = self.capabilities.get('Login')
            if not login_url:
                logger.error("There is no login URL stored, so additional capabilities cannot be added.")
                raise ValueError("Cannot automatically determine absolute path for {0!s} given.".format(uri))

            parts = urlparse(login_url)
            uri = parts.scheme + '://' + parts.hostname + '/' + uri.lstrip('/')

        self.capabilities[name] = uri
项目:Tencent_Cartoon_Download    作者:Fretice    | 项目源码 | 文件源码
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
项目:lddmm-ot    作者:jeanfeydy    | 项目源码 | 文件源码
def _get_proxy_config(self):
        """
        Determine if self._url should be passed through a proxy. If so, return
        the appropriate proxy_server and proxy_port. Assumes https_proxy is used
        when ssl_enabled=True.

        """

        proxy_server = None
        proxy_port = None
        ssl_enabled = self._ssl_enabled

        if ssl_enabled:
            proxy = os.environ.get("https_proxy")
        else:
            proxy = os.environ.get("http_proxy")
        no_proxy = os.environ.get("no_proxy")
        no_proxy_url = no_proxy and self._server in no_proxy

        if proxy and not no_proxy_url:
            p = urlparse(proxy)
            proxy_server = p.hostname
            proxy_port = p.port

        return proxy_server, proxy_port
项目:elm    作者:ContinuumIO    | 项目源码 | 文件源码
def __init__(self, base_url='https://hydro1.gesdisc.eosdis.nasa.gov/data/NLDAS/', **layout_kwargs):
        self.base_url = base_url
        self.selected_url = None
        if 'min_width' not in layout_kwargs:
            layout_kwargs['min_width'] = '30%'
        self.label_layout = Layout(**layout_kwargs)

        dd = widgets.Select(
            options=self.get_links(
                base_url,
                href_filter=self.dir_and_not_data,
            ),
            description='', #urlparse(base_url).path,
        )

        dd.observe(partial(self.on_value_change, url=self.base_url), names='value')
        lbl = widgets.Label(urlparse(self.base_url).path, layout=self.label_layout)
        hbox = widgets.HBox([lbl, dd])
        self.elts = [hbox, lbl, dd]
        display(hbox)
项目:elm    作者:ContinuumIO    | 项目源码 | 文件源码
def on_value_change(self, change, url):
        next_url = change['new']
        if next_url is None: # 'Select...' chosen
            return
        if next_url.endswith('.grb'): # File reached
            return self.select_url(next_url)
        [w.close() for w in self.elts]
        links = self.get_links(next_url,
                               href_filter=(self.dir_and_not_data
                                            if next_url == self.base_url
                                            else self.dir_or_grib))
        if not links:
            return
        next_dd = widgets.Select(
            options=links,
            description='', #urlparse(url).path,
        )
        next_dd.observe(partial(self.on_value_change, url=next_url), names='value')
        lbl = widgets.Label(urlparse(next_url).path, layout=self.label_layout)
        hbox = widgets.HBox([lbl, next_dd])
        self.elts = [hbox, lbl, next_dd]
        display(hbox)
项目:coolrelation    作者:mrtial    | 项目源码 | 文件源码
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
项目:bareon-ironic    作者:openstack    | 项目源码 | 文件源码
def validate_href(self, image_href):
        path = urlparse.urlparse(image_href).path.lstrip('/')
        if not path:
            raise exception.ImageRefValidationFailed(
                _("No path specified in swift resource reference: %s. "
                  "Reference must be like swift:container/path")
                % str(image_href))

        container, s, object = path.partition('/')
        try:
            headers = self.client.head_object(container, object)
        except exception.SwiftOperationError as e:
            raise exception.ImageRefValidationFailed(
                _("Cannot fetch %(url)s resource. %(exc)s") %
                dict(url=str(image_href), exc=str(e)))

        return (container, object, headers)
项目:bareon-ironic    作者:openstack    | 项目源码 | 文件源码
def validate_href(self, image_href):
        path = urlparse.urlparse(image_href).path.lstrip('/')
        if not path:
            raise exception.InvalidParameterValue(
                _("No path specified in rsync resource reference: %s. "
                  "Reference must be like rsync:host::module/path")
                % str(image_href))
        try:
            stdout, stderr = utils.execute(
                'rsync', '--stats', '--dry-run',
                path,
                ".",
                check_exit_code=[0],
                log_errors=processutils.LOG_ALL_ERRORS)
            return path, stdout, stderr

        except (processutils.ProcessExecutionError, OSError) as ex:
            raise exception.ImageRefValidationFailed(
                _("Cannot fetch %(url)s resource. %(exc)s") %
                dict(url=str(image_href), exc=str(ex)))
项目:bareon-ironic    作者:openstack    | 项目源码 | 文件源码
def get_glance_image_uuid_name(task, url):
    """Converting glance links.

    Links like:
        glance:name
        glance://name
        glance:uuid
        glance://uuid
        name
        uuid
    are converted to tuple
        uuid, name

    """
    urlobj = urlparse.urlparse(url)
    if urlobj.scheme and urlobj.scheme != 'glance':
        raise exception.InvalidImageRef("Only glance images are supported.")

    path = urlobj.path or urlobj.netloc
    img_info = _get_glanceclient(task.context).show(path)
    return img_info['id'], img_info['name']
项目:bareon-ironic    作者:openstack    | 项目源码 | 文件源码
def url_download_raw_secured(context, node, url):
    """Download raw contents of the URL bypassing cache and temp files."""
    if not url:
        return
    url = url.rstrip('/')
    url = append_storage_prefix(node, url)

    scheme = parse.urlparse(url).scheme
    sources_with_tenant_isolation = ('glance', 'swift')
    if scheme not in sources_with_tenant_isolation:
        raise bareon_exception.UnsafeUrlError(
            url=url,
            details="Use one of the following storages "
                    "for this resource: %s"
                    % str(sources_with_tenant_isolation))

    resource_storage = image_service.get_image_service(
        url, context=context)
    out = StringIO.StringIO()
    try:
        resource_storage.download(url, out)
        return out.getvalue()
    finally:
        out.close()
项目:Scrapy-BenchCLI    作者:Parth-Vader    | 项目源码 | 文件源码
def __init__(self, **kw):
        super(FollowAllSpider, self).__init__(**kw)
        url = 'http://localhost/books.toscrape.com/index.html'
        if not url.startswith('http://') and not url.startswith('https://'):
            url = 'http://%s/' % url
        self.url = url
        self.allowed_domains = [re.sub(r'^www\.', '', urlparse(url).hostname)]
        self.link_extractor = LinkExtractor()
        self.cookies_seen = set()
        self.previtem = 0
        self.items = 0
        self.timesec = datetime.datetime.utcnow()
项目:PornGuys    作者:followloda    | 项目源码 | 文件源码
def __init__(self, requirement_string):
        try:
            req = REQUIREMENT.parseString(requirement_string)
        except ParseException as e:
            raise InvalidRequirement(
                "Invalid requirement, parse error at \"{0!r}\"".format(
                    requirement_string[e.loc:e.loc + 8]))

        self.name = req.name
        if req.url:
            parsed_url = urlparse.urlparse(req.url)
            if not (parsed_url.scheme and parsed_url.netloc) or (
                    not parsed_url.scheme and not parsed_url.netloc):
                raise InvalidRequirement("Invalid URL given")
            self.url = req.url
        else:
            self.url = None
        self.extras = set(req.extras.asList() if req.extras else [])
        self.specifier = SpecifierSet(req.specifier)
        self.marker = req.marker if req.marker else None
项目:dnsknife    作者:Gandi    | 项目源码 | 文件源码
def params(arg):
    """
    Parse `arg` and returns a dict describing the TPDA
    arguments.

    Resolution order:
        - `arg` is a string is parsed as an URL
        - `arg` has an items() method and is parsed
           (works for multidicts, ..)
        - `arg` is a iterable is parsed as a
           ((name, value), (name, value), ..)
    """
    if isinstance(arg, str):
        pr = parse.urlparse(arg)
        if not pr.query:
            raise exceptions.IncompleteURI
        return Params(parse.parse_qsl(pr.query)).params()
    elif hasattr(arg, 'items'):
        return Params(arg.items()).params()
    elif hasattr(arg, '__iter__'):
        return Params(arg).params()
项目:momo    作者:gusibi    | 项目源码 | 文件源码
def media_for(hash_key, scheme=None, style=None):
    if not hash_key:
        return None
    url = hash_key.split('!')[0]
    up = urlparse(url)
    hash_domain = up.hostname
    if hash_domain and hash_domain not in Config.QINIU_DOMAINS:
        if hash_domain == 'wx.qlogo.cn':
            hash_key = hash_key.replace('http://', 'https://')
        return hash_key
    _hash = up.path
    if len(_hash) != 0 and _hash[0] == '/':
        _hash = _hash[1:]

    media_host = Config.QINIU_HOST
    url = '%s/%s' % (media_host, _hash)
    if url.endswith('.amr'):
        url = '%s.mp3' % url[:-4]
    if url and style:
        url = '%s!%s' % (url, style)
    return url
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def create_temp_url(swift_client, name, timeout, container=None):

    container = container or '%(name)s-%(uuid)s' % {
        'name': name, 'uuid': uuid.uuid4()}
    object_name = str(uuid.uuid4())

    swift_client.put_container(container)
    key_header = 'x-account-meta-temp-url-key'
    if key_header not in swift_client.head_account():
        swift_client.post_account({
            key_header: six.text_type(uuid.uuid4())[:32]})

    key = swift_client.head_account()[key_header]
    project_path = swift_client.url.split('/')[-1]
    path = '/v1/%s/%s/%s' % (project_path, container, object_name)
    timeout_secs = timeout * 60
    tempurl = swiftclient_utils.generate_temp_url(path, timeout_secs, key,
                                                  'PUT')
    sw_url = urlparse.urlparse(swift_client.url)
    put_url = '%s://%s%s' % (sw_url.scheme, sw_url.netloc, tempurl)
    swift_client.put_object(container, object_name, '')
    return put_url
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def _get_node_ip_heartbeat(task):
    callback_url = task.node.driver_internal_info.get('agent_url', '')
    return urlparse.urlparse(callback_url).netloc.split(':')[0]
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def _parse_root_device_hints(node):
    """Convert string with hints to dict. """
    root_device = node.properties.get('root_device')
    if not root_device:
        return {}
    try:
        parsed_hints = irlib_utils.parse_root_device_hints(root_device)
    except ValueError as e:
        raise exception.InvalidParameterValue(
            _('Failed to validate the root device hints for node %(node)s. '
              'Error: %(error)s') % {'node': node.uuid, 'error': e})
    root_device_hints = {}
    advanced = {}
    for hint, value in parsed_hints.items():
        if isinstance(value, six.string_types):
            if value.startswith('== '):
                root_device_hints[hint] = int(value[3:])
            elif value.startswith('s== '):
                root_device_hints[hint] = urlparse.unquote(value[4:])
            else:
                advanced[hint] = value
        else:
            root_device_hints[hint] = value
    if advanced:
        raise exception.InvalidParameterValue(
            _('Ansible-deploy does not support advanced root device hints '
              'based on oslo.utils operators. '
              'Present advanced hints for node %(node)s are %(hints)s.') % {
                  'node': node.uuid, 'hints': advanced})
    return root_device_hints
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def _prepare_variables(task):
    node = task.node
    i_info = node.instance_info
    image = {}
    for i_key, i_value in i_info.items():
        if i_key.startswith('image_'):
            image[i_key[6:]] = i_value
    image['mem_req'] = _calculate_memory_req(task)

    checksum = image.get('checksum')
    if checksum:
        # NOTE(pas-ha) checksum can be in <algo>:<checksum> format
        # as supported by various Ansible modules, mostly good for
        # standalone Ironic case when instance_info is populated manually.
        # With no <algo> we take that instance_info is populated from Glance,
        # where API reports checksum as MD5 always.
        if ':' not in checksum:
            image['checksum'] = 'md5:%s' % checksum
    _add_ssl_image_options(image)
    variables = {'image': image}
    configdrive = i_info.get('configdrive')
    if configdrive:
        if urlparse.urlparse(configdrive).scheme in ('http', 'https'):
            cfgdrv_type = 'url'
            cfgdrv_location = configdrive
        else:
            cfgdrv_location = _get_configdrive_path(node.uuid)
            with open(cfgdrv_location, 'w') as f:
                f.write(configdrive)
            cfgdrv_type = 'file'
        variables['configdrive'] = {'type': cfgdrv_type,
                                    'location': cfgdrv_location}

    root_device_hints = _parse_root_device_hints(node)
    if root_device_hints:
        variables['root_device_hints'] = root_device_hints

    return variables
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def get_addon_host_ident():
    ident = urlparse(options.get('system.url-prefix')).hostname
    if ident in ('localhost', '127.0.0.1', None, ''):
        return 'app.dev.getsentry.com'
    return ident
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def base_url(url):
    result = urlparse(url)
    return '%s://%s' % (result.scheme, result.netloc)
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def from_request(cls, request):
        host = urlparse(request.url).netloc
        return cls('Unable to reach host: {}'.format(host))
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def get_decorator(placebo):
    url = placebo._get_url()

    if isinstance(url, six.string_types):
        url = parse.urlparse(url)

    # TODO should we remove empty parts of url?
    match_kwargs = {
        'scheme': url.scheme,
        'netloc': url.netloc,
        'path': url.path,
        'method': placebo._get_method(),
        'query': url.query
    }

    match_kwargs = {k: v
                    for k, v in match_kwargs.items()
                    if v}

    @httmock.urlmatch(**match_kwargs)
    def mock_response(url, request):
        # Convert parse result type from SplitResult to ParseResult
        url = parse.urlparse(url.geturl())
        # if body is empty httmock returns None
        # but we want ot to be always string.
        body = request.body or ''
        headers = request.headers
        return {'status_code': placebo._get_status(url, headers, body),
                'content': placebo._get_body(url, headers, body),
                'headers': placebo._get_headers(url, headers, body)}

    return httmock.with_httmock(mock_response)
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def __init__(self, url, headers, body):
        if isinstance(url, (parse.ParseResult, parse.SplitResult)):
            self.url = url.geturl()
            self.parsed_url = url
        elif isinstance(url, six.string_types):
            self.url = url
            self.parsed_url = parse.urlparse(url)

        self.headers = headers
        self.body = body
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def get_decorator(placebo):
    url = placebo._get_url()

    if isinstance(url, six.string_types):
        url = parse.urlparse(url)

    # TODO should we remove empty parts of url?
    match_kwargs = {
        'scheme': url.scheme,
        'netloc': url.netloc,
        'path': url.path,
        'method': placebo._get_method(),
        'query': url.query
    }

    match_kwargs = {k: v
                    for k, v in match_kwargs.items()
                    if v}

    @httmock.urlmatch(**match_kwargs)
    def mock_response(url, request):
        # Convert parse result type from SplitResult to ParseResult
        url = parse.urlparse(url.geturl())
        # if body is empty httmock returns None
        # but we want ot to be always string.
        body = request.body or ''
        headers = request.headers
        return {'status_code': placebo._get_status(url, headers, body),
                'content': placebo._get_body(url, headers, body),
                'headers': placebo._get_headers(url, headers, body)}

    return httmock.with_httmock(mock_response)