Python urlparse 模块,ParseResult() 实例源码

我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用urlparse.ParseResult()

项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def strip_subfolder(url):
    """
    Strip off the subfolder if it exists so we always use the exact same
    share url for saving counts.
    """

    subfolder = app.config.get('SUBFOLDER', None)
    if not subfolder:
        return url

    p = urlparse.urlparse(url)

    if not p.path.startswith(subfolder):
        return url

    new_path = p.path.replace('%s' % (subfolder), '', 1)
    new_url = urlparse.ParseResult(p.scheme, p.netloc, new_path, p.params,
                                   p.query, p.fragment)
    return new_url.geturl()
项目:circlecli    作者:TheRealJoeLinux    | 项目源码 | 文件源码
def _build_url(self, endpoint, params={}):
        """Return the full URL for the desired endpoint.

        Args:
            endpoint (str): the API endpoint after base URL
            params (dict): any params to include in the request

        Returns:
            (str) the full URL of the request
        """
        new_params = {'circle-token': self._token}
        new_params.update(params)

        parsed_url = urlparse(self._base_url)
        new_parse = ParseResult(scheme=parsed_url.scheme, netloc=parsed_url.netloc,
                                path='/'.join((parsed_url.path, endpoint)),
                                params='', query=urlencode(new_params),
                                fragment='')

        return urlunparse(new_parse)
项目:stormtrooper    作者:CompileInc    | 项目源码 | 文件源码
def normalize_website(cls, w):
        from django.core.validators import EMPTY_VALUES
        from urlparse import urlparse, urlunparse, ParseResult
        w = w.decode('utf-8')
        if w in EMPTY_VALUES:
            return None
        w = w.lower().strip()
        if not w.startswith('http://') and not w.startswith('https://'):
            w = 'http://' + w.lstrip('/')
        try:
            parsed = urlparse(w)
        except ValueError as e:
            return None
        else:
            new_parsed = ParseResult(scheme='http',
                                     netloc=cls.get_website_tld(w),
                                     path=parsed.path.rstrip('/'),
                                     params='',
                                     query=parsed.query,
                                     fragment='')
            return urlunparse(new_parsed)
项目:litchi    作者:245967906    | 项目源码 | 文件源码
def post(self, request, pk):
        user = User.objects.get(id=pk)
        sign = hashlib.md5(user.email + settings.SECRET_KEY).hexdigest()
        url = urlparse.ParseResult(
            scheme=request.scheme,
            netloc=urlparse.urlparse(request.get_raw_uri()).netloc,
            path=reverse(('core:SetPassword')),
            params='',
            query = urllib.urlencode({'email': user.email, 'sign': sign}),
            fragment='',
        ).geturl()
        msg = EmailMultiAlternatives(
            subject='??????',
            body=get_template('users/user_email_activate.html').render({'url': url}),
            from_email=settings.EMAIL_HOST_USER,
            to=[user.email,],
        )
        msg.content_subtype = 'html'
        status = msg.send(fail_silently=True)
        response = '??????' if status else '??????, ???'
        return HttpResponse(response)
项目:ws-cli    作者:hack4sec    | 项目源码 | 文件源码
def build_path(link, url_path):
        """ Build link with full path (for relatively links) """
        if link.path[0:1] == '/':
            return link

        path = link.path
        path = SpiderCommon.del_file_from_path(url_path) + "/" + path

        return ParseResult(
            scheme=link.scheme,
            netloc=link.netloc,
            path=path,
            params=link.params,
            query=link.query,
            fragment=link.fragment
        )
项目:OUXMLConverter    作者:chaotic-kingdoms    | 项目源码 | 文件源码
def replace_qs_param(url, params):
        """ Add GET params to provided URL being aware of existing.
        from: http://stackoverflow.com/a/25580545

        :param url: string of target URL
        :param params: dict containing requested params to be added
        :return: string with updated URL
        """

        url = urllib.unquote(url)
        parsed_url = urlparse(url)
        get_args = parsed_url.query

        # Converting URL arguments to dict and update
        parsed_get_args = dict(parse_qsl(get_args))
        parsed_get_args.update(params)

        # Converting URL argument to proper query string
        encoded_get_args = urllib.urlencode(parsed_get_args, doseq=True)
        new_url = ParseResult(
            parsed_url.scheme, parsed_url.netloc, parsed_url.path,
            parsed_url.params, encoded_get_args, parsed_url.fragment
        ).geturl()

        return new_url
项目:hls-player    作者:weimingtom    | 项目源码 | 文件源码
def make_url(base_url, url):
    if urlparse.urlsplit(url).scheme == '':
        url = urlparse.urljoin(base_url, url)
    if 'HLS_PLAYER_SHIFT_PORT' in os.environ.keys():
        shift = int(os.environ['HLS_PLAYER_SHIFT_PORT'])
        p = urlparse.urlparse(url)
        loc = p.netloc
        if loc.find(":") != -1:
            loc, port = loc.split(':')
            port = int(port) + shift
            loc = loc + ":" + str(port)
        elif p.scheme == "http":
            port = 80 + shift
            loc = loc + ":" + str(shift)
        p = urlparse.ParseResult(scheme=p.scheme,
                                 netloc=loc,
                                 path=p.path,
                                 params=p.params,
                                 query=p.query,
                                 fragment=p.fragment)
        url = urlparse.urlunparse(p)
    return url
项目:daenerys    作者:dongweiming    | 项目源码 | 文件源码
def validate_url(self, url):
        url_path = urllib.quote(url.path, safe=b"/%")
        url_query = urllib.quote(url.query, safe=b"?=&")

        url = ParseResult(url.scheme, url.netloc, url_path,
                          url.params, url_query, url.fragment)

        has_hostname = url.hostname is not None and len(url.hostname) > 0
        has_http_scheme = url.scheme in ("http", "https")
        has_path = not len(url.path) or url.path.startswith("/")

        if not (has_hostname and has_http_scheme and has_path):
            raise NotSupported("invalid url: %s" % repr(url))

        return url
项目:upnpclient    作者:flyte    | 项目源码 | 文件源码
def test_marshal_uri(self):
        """
        Should parse a 'uri' value into a `ParseResult`.
        """
        uri = 'https://media.giphy.com/media/22kxQ12cxyEww/giphy.gif?something=variable'
        marshalled, val = upnp.marshal.marshal_value('uri', uri)
        self.assertTrue(marshalled)
        self.assertIsInstance(val, ParseResult)
项目:fast_urlparse    作者:Parsely    | 项目源码 | 文件源码
def test_result_pairs(self):
        # Check encoding and decoding between result pairs
        result_types = [
          urlparse.DefragResult,
          urlparse.SplitResult,
          urlparse.ParseResult,
        ]
        for result_type in result_types:
            self._check_result_type(result_type)
项目:python-phoenixdb    作者:lalinsky    | 项目源码 | 文件源码
def parse_url(url):
    url = urlparse.urlparse(url)
    if not url.scheme and not url.netloc and url.path:
        netloc = url.path
        if ':' not in netloc:
            netloc = '{}:8765'.format(netloc)
        return urlparse.ParseResult('http', netloc, '/', '', '', '')
    return url


# Defined in phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
项目:py-ipv8    作者:qstokkink    | 项目源码 | 文件源码
def add_url_params(url, params):
    """ Add GET params to provided URL being aware of existing.

    :param url: string of target URL
    :param params: dict containing requested params to be added
    :return: string with updated URL

    >> url = 'http://stackoverflow.com/test?answers=true'
    >> new_params = {'answers': False, 'data': ['some','values']}
    >> add_url_params(url, new_params)
    'http://stackoverflow.com/test?data=some&data=values&answers=false'
    """
    # Unquoting URL first so we don't loose existing args
    url = unquote(url)
    # Extracting url info
    parsed_url = urlparse(url)
    # Extracting URL arguments from parsed URL
    get_args = parsed_url.query
    # Converting URL arguments to dict
    parsed_get_args = dict(parse_qsl(get_args))
    # Merging URL arguments dict with new params
    parsed_get_args.update(params)

    # Bool and Dict values should be converted to json-friendly values
    # you may throw this part away if you don't like it :)
    parsed_get_args.update(
        {k: dumps(v) for k, v in parsed_get_args.items()
         if isinstance(v, (bool, dict))}
    )

    # Converting URL argument to proper query string
    encoded_get_args = urlencode(parsed_get_args, doseq=True)
    # Creating new parsed result object based on provided with new
    # URL arguments. Same thing happens inside of urlparse.
    new_url = ParseResult(
        parsed_url.scheme, parsed_url.netloc, parsed_url.path,
        parsed_url.params, encoded_get_args, parsed_url.fragment
    ).geturl()

    return new_url
项目:Eagle    作者:magerx    | 项目源码 | 文件源码
def __init__(self, init):
        """
        ????urlparse.ParseResult?init????
        """
        self.query = init.query
        self.params = init.params
        self.fragment = init.fragment
        self.path = init.path
        self.netloc = init.netloc
        self.scheme = init.scheme
项目:Eagle    作者:magerx    | 项目源码 | 文件源码
def to_standard(self):
        """
        ???????urlparse.ParseResult??
        """
        return ParseResult(
            scheme=self.scheme,
            netloc=self.netloc,
            path=self.path,
            params=self.params,
            query=self.query,
            fragment=self.fragment
        )
项目:ws-cli    作者:hack4sec    | 项目源码 | 文件源码
def test_link_allowed(self):
        Registry().set('allow_regexp', re.compile('allowed'))
        assert bool(
            self.model._link_allowed(
                ParseResult(path="/abc/allowed.php", scheme='', netloc='', params='', query='', fragment='')))
        assert not bool(
            self.model._link_allowed(
                ParseResult(path="/denied2.php", scheme='', netloc='', params='', query='', fragment='')))
项目:ws-cli    作者:hack4sec    | 项目源码 | 文件源码
def test_build_path(self):
        test_link = ParseResult(path="/abc/allowed.php", scheme='', netloc='', params='', query='', fragment='')
        assert self.model.build_path(test_link, "def") == test_link

        test_link = ParseResult(path="abc/allowed.php", scheme='', netloc='', params='', query='', fragment='')
        check_link = ParseResult(path="/a/abc/allowed.php", scheme='', netloc='', params='', query='', fragment='')
        assert self.model.build_path(test_link, "/a/") == check_link
项目:ws-cli    作者:hack4sec    | 项目源码 | 文件源码
def test_clear_link(self):
        test_link = ParseResult(
            path="/ab\\c//./d/../allowed.php", scheme='', netloc='', params='', query='?a=b&c=d', fragment='')
        check_link = ParseResult(
            path="/ab/c/allowed.php", scheme='', netloc='', params='', query='?a=b&c=d', fragment='')
        assert self.model.clear_link(test_link) == check_link
项目:openag_python    作者:OpenAgInitiative    | 项目源码 | 文件源码
def replicate_per_farm_dbs(cloud_url=None, local_url=None, farm_name=None):
    """
    Sete up replication of the per-farm databases from the local server to the
    cloud server.

    :param str cloud_url: Used to override the cloud url from the global
    configuration in case the calling function is in the process of
    initializing the cloud server
    :param str local_url: Used to override the local url from the global
    configuration in case the calling function is in the process of
    initializing the local server
    :param str farm_name: Used to override the farm name from the global
    configuratino in case the calling function is in the process of
    initializing the farm
    """
    cloud_url = cloud_url or config["cloud_server"]["url"]
    local_url = local_url or config["local_server"]["url"]
    farm_name = farm_name or config["cloud_server"]["farm_name"]
    username = config["cloud_server"]["username"]
    password = config["cloud_server"]["password"]

    # Add credentials to the cloud url
    parsed_cloud_url = urlparse(cloud_url)
    if not parsed_cloud_url.username:
        new_netloc = "{}:{}@{}".format(
            username, password, parsed_cloud_url.netloc
        )
    cloud_url = ParseResult(
        parsed_cloud_url.scheme, new_netloc, parsed_cloud_url.path,
        parsed_cloud_url.params, parsed_cloud_url.query,
        parsed_cloud_url.fragment
    ).geturl()

    server = Server(local_url)
    for db_name in per_farm_dbs:
        remote_db_name = "{}/{}/{}".format(username, farm_name, db_name)
        server.replicate(
            db_name, db_name, urljoin(cloud_url, remote_db_name),
            continuous=True
        )
项目:tensorboard    作者:tensorflow    | 项目源码 | 文件源码
def open_url(url):
  ru = urlparse.urlparse(url)
  pu = urlparse.ParseResult('', '', ru.path, ru.params, ru.query, ru.fragment)
  if ru.scheme == 'https':
    c = httplib.HTTPSConnection(ru.netloc)
  else:
    c = httplib.HTTPConnection(ru.netloc)
  c.putrequest('GET', pu.geturl())
  c.putheader('User-Agent', FLAGS.user_agent)
  c.endheaders()
  return c.getresponse()
项目:edd    作者:JBEI    | 项目源码 | 文件源码
def construct_page_url(elements, params, index, limit):
    if params and index is not None:
        params[RESULT_LIMIT_PARAMETER] = limit
        params[RESULT_OFFSET_PARAMETER] = index * limit
        query = urlencode(params, True)
        inputs = ParseResult(
            elements.scheme,
            elements.netloc,
            elements.path,
            elements.params,
            query,
            elements.fragment,
        )
        return urlunparse(inputs)
    return None
项目:deb-python-jingo    作者:openstack    | 项目源码 | 文件源码
def urlparams(url_, fragment=None, query_dict=None, **query):
    """
Add a fragment and/or query parameters to a URL.

New query params will be appended to exising parameters, except duplicate
names, which will be replaced.
"""
    url_ = urlparse.urlparse(url_)
    fragment = fragment if fragment is not None else url_.fragment

    q = url_.query
    new_query_dict = (QueryDict(smart_str(q), mutable=True) if
                      q else QueryDict('', mutable=True))
    if query_dict:
        for k, l in query_dict.lists():
            new_query_dict[k] = None  # Replace, don't append.
            for v in l:
                new_query_dict.appendlist(k, v)

    for k, v in query.items():
        # Replace, don't append.
        if isinstance(v, list):
            new_query_dict.setlist(k, v)
        else:
            new_query_dict[k] = v

    query_string = urlencode([(k, v) for k, l in new_query_dict.lists() for
                              v in l if v is not None])
    new = urlparse.ParseResult(url_.scheme, url_.netloc, url_.path,
                               url_.params, query_string, fragment)
    return new.geturl()
项目:xss_fuzzer_console    作者:tytrusty    | 项目源码 | 文件源码
def gen_urls(p, value, target_param=''):
    # Make a different URL for each query argument
    query = parse_qs(p.query.encode('utf-8'))
    url_list = list()
    for param in query.keys():
        if target_param == '' or target_param == param:
            new_query_d = copy(query)  # Copy of query dictionary
            new_query_d[param] = value
            new_query = urlencode(new_query_d, doseq=True)  # New query
            # Gen and add new url to url list
            url = ParseResult(p.scheme, p.netloc, p.path, p.params,
                              new_query, p.fragment).geturl()
            url_list.append((url, param))

    return url_list  # Return full list of all generated urls
项目:MTTT    作者:roxana-lafuente    | 项目源码 | 文件源码
def __init__(self, post_editing_source, post_editing_reference, notebook, grid, output_directory):
        self.post_editing_source = post_editing_source
        self.post_editing_reference = post_editing_reference
        self.translation_tab_grid = grid
        self.notebook = notebook
        self.modified_references =  []
        self.saved_modified_references = []
        self.visibility_of_statistics_menu = True
        self.output_directory = output_directory

        self.tables = {}
        self.source_log = {}
        self.HTML_view = WebKit.WebView()
        uri = "statistics/generated/stats.html"
        uri = os.path.realpath(uri)
        uri = urlparse.ParseResult('file', '', uri, '', '', '')
        uri = urlparse.urlunparse(uri)
        self.HTML_view.load_uri(uri)
        filename = post_editing_reference[post_editing_reference.rfind('/'):]
        filename_without_extension = os.path.splitext(filename)[0]
        filename_extension = os.path.splitext(filename)[1]
        self.saved_origin_filepath = self.output_directory + filename


        self.tables["translation_table"] =  Table("translation_table",self.post_editing_source,self.post_editing_reference, self.preparePostEditingAnalysis_event,self.preparePostEditingAnalysis, self.calculate_statistics_event, self.translation_tab_grid, self.output_directory)

        self.source_log_filepath = self.output_directory + '/source_log.json'


        shutil.rmtree("./statistics/generated", ignore_errors=True)
        os.makedirs(os.path.abspath("statistics/generated"))

        self.translation_tab_grid.show_all()
        self.tables["translation_table"].save_post_editing_changes_button.hide()
        self.tables["translation_table"].statistics_button.hide()
        self.tables["translation_table"].insertions_statistics_button.hide()
        self.tables["translation_table"].deletions_statistics_button.hide()
        self.tables["translation_table"].time_statistics_button.hide()
项目:depot_tools    作者:webrtc-uwp    | 项目源码 | 文件源码
def reparse_url(parsed_url, query_params):
  return urlparse.ParseResult(
      scheme=parsed_url.scheme,
      netloc=parsed_url.netloc,
      path=parsed_url.path,
      params=parsed_url.params,
      fragment=parsed_url.fragment,
      query=urllib.urlencode(query_params, doseq=True))
项目:slim    作者:avalentino    | 项目源码 | 文件源码
def make_support_link(email, appname='SLiM'):
    support_link_parts = ParseResult(
        scheme='mailto',
        netloc='',
        path=email,
        params='',
        query='subject=' + quote('[%s] Support request' % appname),
        # query=urlencode({'subject': '[%s] Support request' % appname},
        #                 quote_via=quote),
        fragment='',
    )
    return urlunparse(support_link_parts)
项目:slim    作者:avalentino    | 项目源码 | 文件源码
def sqlite_uri_for(path):
    uri_parts = ParseResult(
        scheme='sqlite',
        netloc='/',
        path=path,
        params='',
        query='',
        fragment='')

    return urlunparse(uri_parts)
项目:caldav    作者:python-caldav    | 项目源码 | 文件源码
def __init__(self, url):
        if isinstance(url, ParseResult) or isinstance(url, SplitResult):
            self.url_parsed = url
            self.url_raw = None
        else:
            self.url_raw = url
            self.url_parsed = None
项目:caldav    作者:python-caldav    | 项目源码 | 文件源码
def objectify(self, url):
        if url is None:
            return None
        if isinstance(url, URL):
            return url
        else:
            return URL(url)

    # To deal with all kind of methods/properties in the ParseResult
    # class
项目:caldav    作者:python-caldav    | 项目源码 | 文件源码
def unauth(self):
        if not self.is_auth():
            return self
        return URL.objectify(ParseResult(
            self.scheme,
            '%s:%s' % (self.hostname,
                       self.port or {'https': 443, 'http': 80}[self.scheme]),
            self.path.replace('//', '/'), self.params, self.query,
            self.fragment))
项目:caldav    作者:python-caldav    | 项目源码 | 文件源码
def join(self, path):
        """
        assumes this object is the base URL or base path.  If the path
        is relative, it should be appended to the base.  If the path
        is absolute, it should be added to the connection details of
        self.  If the path already contains connection details and the
        connection details differ from self, raise an error.
        """
        pathAsString = str(path)
        if not path or not pathAsString:
            return self
        path = URL.objectify(path)
        if (
            (path.scheme and self.scheme and path.scheme != self.scheme) or
            (path.hostname and self.hostname and
             path.hostname != self.hostname) or
            (path.port and self.port and path.port != self.port)
        ):
            raise ValueError("%s can't be joined with %s" % (self, path))

        if path.path[0] == '/':
            ret_path = uc2utf8(path.path)
        else:
            sep = "/"
            if self.path.endswith("/"):
                sep = ""
            ret_path = "%s%s%s" % (self.path, sep, uc2utf8(path.path))
        return URL(ParseResult(
            self.scheme or path.scheme, self.netloc or path.netloc, ret_path,
            path.params, path.query, path.fragment))
项目:environs    作者:sloria    | 项目源码 | 文件源码
def test_url_cast(self, set_env, env):
        set_env({'URL': 'http://stevenloria.com/projects/?foo=42'})
        res = env.url('URL')
        assert isinstance(res, urlparse.ParseResult)
项目:wmt16-document-alignment-task    作者:christianbuck    | 项目源码 | 文件源码
def strip_uri(self, uri, expected_language=None,
                  remove_index=False):
        ''' Returns (stripped_uri, success) '''
        parsed_uri = urlparse.urlparse(uri)

        matched_languages = [self.match(parsed_uri.path),
                             self.match(parsed_uri.query)]

        if (expected_language is not None) and \
                (expected_language not in matched_languages):
            # we removed a bit of the URL but is does not support our
            # hope to find expected_language, e.g. removed /fr/ when we were
            # looking for Italian pages.
            return '', False

        stripped_path = self.strip_path(parsed_uri.path)

        # repair some stripping artifacts
        stripped_path = re.sub(r'//+', '/', stripped_path)
        stripped_path = re.sub(r'__+', '_', stripped_path)
        stripped_path = re.sub(r'/_+', '/', stripped_path)
        stripped_path = re.sub(r'_/', '/', stripped_path)
        stripped_path = re.sub(r'--+', '-', stripped_path)

        # remove new trailing /
        if stripped_path and stripped_path[-1] == '/' \
                and parsed_uri.path and parsed_uri.path[-1] != '/':
            stripped_path = stripped_path[:-1]

        # add removed trailing /
        if not stripped_path.endswith('/') and parsed_uri.path.endswith('/'):
            stripped_path += '/'

        stripped_query = self.strip_query(parsed_uri.query)

        # remove index files from tail of path if query empty
        if remove_index and not stripped_query:
            if stripped_path.split('/')[-1].startswith('index'):
                stripped_path = '/'.join(stripped_path.split('/')[:-1])

        netloc = parsed_uri.netloc
        if '@' in netloc:
            netloc = netloc.split('@')[1]
        if ':' in netloc:
            netloc = netloc.split(':')[0]
        if not netloc:
            return '', False

        stripped_uri = urlparse.ParseResult(scheme='http',
                                            netloc=parsed_uri.netloc,
                                            path=stripped_path,
                                            params='',
                                            query=stripped_query,
                                            fragment='').geturl()

        return stripped_uri, stripped_uri != uri
项目:swarm    作者:a7vinx    | 项目源码 | 文件源码
def _parse_url(self,dst,src):
        """
        Check wether target url 'dst' is in the same domain(include port) with url 'src', and 
        convert url into complete url without params.

        Returns:
            String of complete url with query params if it has. if target url is not in the 
            same domain, return '';
        """
        LOG.debug('detecting url: '+dst)
        s_parsed=urlparse.urlparse(src)
        s_scheme=s_parsed.scheme
        s_netloc=s_parsed.netloc
        s_cur_dir=s_parsed.path
        if s_cur_dir[-1]!='/':
            s_cur_dir='/'.join(s_cur_dir.split('/')[:-1])
        else:
            s_cur_dir=s_cur_dir[:-1]

        d_parsed=urlparse.urlparse(dst)
        d_scheme=d_parsed.scheme
        if d_parsed.netloc.find(':')==-1 and d_parsed.netloc!='':
            if d_scheme=='http':
                d_netloc=d_parsed.netloc+':80'
            elif d_scheme=='https':
                d_netloc=d_parsed.netloc+':443'
            elif d_scheme=='':
                d_netloc=d_parsed.netloc+':80' if s_scheme=='http' else d_parsed.netloc+':443'
            else:
                d_netloc=d_parsed.netloc
        else:
            d_netloc=d_parsed.netloc
        # add '/' as prefix if the path does not starts with '/'
        if d_parsed.path!='':
            d_path='/'+d_parsed.path if d_parsed.path[0]!='/' else d_parsed.path
        else:
            d_path='/'
        d_query=d_parsed.query

        # if it is a relative url
        if d_netloc=='':
            return urlparse.ParseResult(s_scheme,s_netloc,s_cur_dir+d_path,'',d_query,'').geturl()
        elif d_netloc==s_netloc and (d_scheme==s_scheme or d_scheme==''):
            return urlparse.ParseResult(s_scheme,s_netloc,d_path,'',d_query,'').geturl()
        else:
            return ''
项目:litchi    作者:245967906    | 项目源码 | 文件源码
def post(self, request):
        form = UserCreateForm(request.POST)
        if form.is_valid():
            email = form.cleaned_data.get('email')
            username = form.cleaned_data.get('username')
            is_active = form.cleaned_data.get('is_active')
            role = form.cleaned_data.get('role')
            groups = form.cleaned_data.get('groups')
            try:
                user = User.objects.create_user(
                    email=email,
                    username=username,
                    is_active=is_active,
                    role=role,
                )
            except IntegrityError:
                error_msg = '???????????'
                groups = UserGroup.objects.only('id', 'name')
                role_types = UserRoleType.attrs
                status_types = UserStatusType.attrs
                context = dict(
                    error_msg=error_msg,
                    groups=groups,
                    role_types=role_types,
                    status_types=status_types,
                )
                return render(request, 'users/user_create.html', context)
            else:
                user.groups.add(*groups)
                sign = hashlib.md5(email + settings.SECRET_KEY).hexdigest()
                url = urlparse.ParseResult(
                    scheme=request.scheme,
                    netloc=urlparse.urlparse(request.get_raw_uri()).netloc,
                    path=reverse(('core:SetPassword')),
                    params='',
                    query = urllib.urlencode({'email': email, 'sign': sign}),
                    fragment='',
                ).geturl()
                msg = EmailMultiAlternatives(
                    subject='??????',
                    body=get_template('users/user_email_activate.html').render({'url': url}),
                    from_email=settings.EMAIL_HOST_USER,
                    to=[email,],
                )
                msg.content_subtype = 'html'
                msg.send(fail_silently=True)
                return HttpResponseRedirect(reverse('user:UserList'))
        else:
            groups = UserGroup.objects.only('id', 'name')
            role_types = UserRoleType.attrs
            status_types = UserStatusType.attrs
            context = dict(
                groups=groups,
                role_types=role_types,
                status_types=status_types,
                form=form,
            )
            return render(request, 'users/user_create.html', context)
项目:baiji    作者:bodylabs    | 项目源码 | 文件源码
def parse(s):
    '''
    Parse a path given as a url. Accepts strings of the form:

       s3://bucket-name/path/to/key
       file:///path/to/file
       /absolution/path/to/file
       relative/path/to/file
       ~/path/from/home/dir/to/file

       To avoid surprises, s3:// and file:// URLs should not
       include ;, ? or #. You should URL-encode such paths.

    Return value is a ParseResult; one of the following:

       ('s3', bucketname, valid_s3_key, ...)
       ('file', '', absolute_path_for_current_filesystem, ...)

    '''
    import re
    from urlparse import urlparse, ParseResult

    if not isinstance(s, basestring):
        raise ValueError("An S3 path must be a string, got %s" % s.__class__.__name__)

    is_windows_path = (len(s) >= 2 and s[1] == ':')
    if is_windows_path:
        scheme, netloc, s3path = 'file', '', s
    else:
        scheme, netloc, s3path, params, query, fragment = urlparse(s)
        if any([params, query, fragment]):
            raise ValueError("Invalid URI: %s" % s)
        if any(char in ';?#' for char in s):
            raise ValueError("Invalid URI: %s" % s)
        try:
            s3path.encode('UTF-8')
        except (UnicodeDecodeError, UnicodeEncodeError):
            raise ValueError("Invalid URI (bad unicode): %s" % s)
            # If somehow something ever gets uploaded with binary in the
            # key, this seems to be the only way to fix it:
            # `s3cmd fixbucket s3://bodylabs-korper-assets`
    if re.match(r'/\w:', s3path): # urlparse, given file:///C:\foo parses us to /C:\foo, so on reconstruction (on windows) we get C:\C:\foo.
        s3path = s3path[1:]
        is_windows_path = True
    if scheme == '':
        scheme = 'file'
    if scheme == 'file' and not is_windows_path:
        if s3path.endswith(os.sep) or s3path.endswith('/'):
            # os.path.abspath strips the trailing '/' so we need to put it back
            s3path = os.path.join(os.path.abspath(os.path.expanduser(s3path)), '')
        else:
            s3path = os.path.abspath(os.path.expanduser(s3path))
    if scheme == 's3' and netloc == '':
        raise ValueError('s3 urls must specify the bucket')
    return ParseResult(scheme, netloc, s3path, params=None, query=None, fragment=None) # pylint: disable=too-many-function-args,unexpected-keyword-arg
项目:baiji    作者:bodylabs    | 项目源码 | 文件源码
def join(base, *additions):
    '''
    Extends os.path.join so work with s3:// and file:// urls

    This inherits a quirk of os.path.join: if 'addition' is
    an absolute path, path components of base are thrown away.

    'addition' must be an absolute or relative path, not
    a URL.

    `base` and `addition` can use any path separator, but the
    result will always be normalized to os.sep.

    '''
    from urlparse import urlparse, urljoin, ParseResult

    addition = sep.join(additions)

    (scheme, netloc, _, params, query, fragment) = urlparse(addition)
    if any([scheme, netloc, params, query, fragment]):
        raise ValueError('Addition must be an absolute or relative path, not a URL')

    if islocal(base):
        return os.path.join(parse(base).path, addition.replace(sep, os.sep))
    k = parse(base)

    # Call urljoin instead of os.path.join, since it uses '/' instead of
    # os.sep, which is '\' on Windows.
    #
    # Given disparity between os.path.join and urljoin, we prefer the
    # behavior of os.path.join:
    #
    #   >>> os.path.join('foo/bar', 'baz')
    #   'foo/bar/baz'
    #   >>> urlparse.urljoin('foo/bar', 'baz')
    #   'foo/baz'
    #
    # So we add a trailing slash if there is none
    if k.path.endswith(sep):
        s3path = urljoin(k.path, addition)
    else:
        s3path = urljoin(k.path + sep, addition)

    return ParseResult(k.scheme, k.netloc, s3path, k.params, k.query, k.fragment).geturl() # pylint: disable=too-many-function-args,unexpected-keyword-arg
项目:ws-cli    作者:hack4sec    | 项目源码 | 文件源码
def prepare_links_for_insert(links, url, site):
        """ Get links dicts and prepare it to insert in MongoDB """
        links_to_insert = []
        for link in links:
            if not link:
                continue

            link = urlparse(link)

            if not link.scheme and \
                not link.netloc and \
                not link.path and \
                not link.query:
                continue

            if link.netloc \
                and link.netloc != site \
                and 'www.' + link.netloc != site \
                and link.netloc != 'www.' + site:
                SpiderCommon._external_hosts.append(link.netloc)
                continue

            link = SpiderCommon.clear_link(link)
            link = SpiderCommon.build_path(link, url.path)
            link = SpiderCommon.clear_link(link)

            links_to_insert.append(link)

        separated_links = []
        for link in links_to_insert:
            paths = link.path.split("/")
            while len(paths) != 1:
                del paths[-1]
                separated_links.append(
                    ParseResult(
                        scheme='',
                        netloc='',
                        path="/".join(paths) + '/',
                        params='',
                        query='',
                        fragment=''
                    )
                )
        return links_to_insert + separated_links