我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.urlsplit()。
def extract_url_path_and_query(full_url=None, no_query=False): """ Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y :param no_query: :type full_url: str :param full_url: full url :return: str """ if full_url is None: full_url = request.url split = urlsplit(full_url) result = split.path or "/" if not no_query and split.query: result += '?' + split.query return result # ################# End Client Request Handler ################# # ################# Begin Middle Functions #################
def change_locale(request): """ Redirect to a given url while changing the locale in the path The url and the locale code need to be specified in the request parameters. """ next = request.REQUEST.get('next', None) if not next: referrer = request.META.get('HTTP_REFERER', None) if referrer: next = urlsplit(referrer)[2] if not next: next = '/' _, path = utils.strip_path(next) if request.method == 'POST': locale = request.POST.get('locale', None) if locale and check_for_language(locale): if localeurl_settings.USE_SESSION: request.session['django_language'] = locale path = utils.locale_path(path, locale) response = http.HttpResponseRedirect(path) return response
def serial_class_for_url(url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'alt': raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,)) class_name = 'Serial' try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'class': class_name = values[0] else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e) return (''.join([parts.netloc, parts.path]), getattr(serial, class_name)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "socket": raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.socket') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: %r' % (option,)) # get host and port host, port = parts.hostname, parts.port if not 0 <= port < 65536: raise ValueError("port not in range 0...65535") except ValueError as e: raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e) return (host, port) # - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "loop": raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.loop') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e) # - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'spy': raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,)) # process options now, directly altering self formatter = FormatHexdump color = False output = sys.stderr try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'file': output = open(values[0], 'w') elif option == 'color': color = True elif option == 'raw': formatter = FormatRaw elif option == 'all': self.show_all = True else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e) self.formatter = formatter(output, color) return ''.join([parts.netloc, parts.path])
def can_view_parent_source (self, url_data): """Determine if parent URL source can be retrieved.""" if not url_data.valid: return False parent = url_data.parent_url if not parent: return False # Directory contents are dynamically generated, so it makes # no sense in viewing/editing them. if parent.startswith(u"file:"): path = urlparse.urlsplit(parent)[2] return not os.path.isdir(get_os_filename(path)) if parent.startswith((u"ftp:", u"ftps:")): path = urlparse.urlsplit(parent)[2] return bool(path) and not path.endswith(u'/') # Only HTTP left return parent.startswith((u"http:", u"https:"))
def crawl(): try: depth_limit = int(request.values['depth']) except ValueError as e: return "Depth parameter must be a number", 400 except: depth_limit = 1 if 'url' in request.values: url = request.values['url'] parsed_url = urlparse.urlsplit(url) if parsed_url.scheme not in ['http', 'https']: return "Only http and https protocols are supported", 400 if parsed_url.netloc == '': return "Missing domain", 400 allowed_domains = [ parsed_url.netloc ] crawler = Crawler(allowed_domains, depth_limit) crawler.crawl(url) return jsonify(**crawler.crawled) else: return "Missing url parameter", 400
def upload(url, filename=None): from urllib.request import Request, urlopen from urllib.parse import urlsplit import shutil def getFilename(url,openUrl): if 'Content-Disposition' in openUrl.info(): # If the response has Content-Disposition, try to get filename from it cd = dict([x.strip().split('=') if '=' in x else (x.strip(),'') for x in openUrl.info().split(';')]) if 'filename' in cd: fname = cd['filename'].strip("\"'") if fname: return fname # if no filename was found above, parse it out of the final URL. return os.path.basename(urlsplit(openUrl.url)[2]) r = urlopen(Request(url)) success = None try: filename = filename or "/tmp/%s" % getFilename(url,r) with open(filename, 'wb') as f: shutil.copyfileobj(r,f) success = filename finally: r.close() return success
def url_to_path_and_args(url, no_query_string=False): if no_query_string: url = url.replace('?', '%3F').replace('#', '%23') components = urlsplit(url) path = components.path if no_query_string: path = unquote(path) # ??????? CEIBA ? %3F ?????????? # ??????? CEIBA ? %253F ?????????? # ?? ceiba_dl.Request ????????????????????? quote_test = path.replace('?', '').replace('#', '').replace(' ', '') if quote(quote_test) != quote_test: path = path.replace('?', '%3F').replace('#', '%23') args = {} else: query_string = components.query args = parse_qs(query_string, keep_blank_values=True) for key, value in args.items(): if isinstance(value, list): assert len(value) == 1 args[key] = value[0] return (path, args) # lxml ????????? None??????????
def url_join(*parts, **kwargs): """ Normalize url parts and join them with a slash. adapted from: http://codereview.stackexchange.com/q/13027 """ def concat_paths(sequence): result = [] for path in sequence: result.append(path) if path.startswith('/'): break return '/'.join(reversed(result)) schemes, netlocs, paths, queries, fragments = zip(*(urlsplit(part) for part in reversed(parts))) scheme = next((x for x in schemes if x), kwargs.get('scheme', 'http')) netloc = next((x for x in netlocs if x), '') path = concat_paths(paths) query = queries[0] fragment = fragments[0] return urlunsplit((scheme, netloc, path, query, fragment))
def do_GET(self): # /?oauth_token=72157630789362986-5405f8542b549e95&oauth_verifier=fe4eac402339100e qs = urllib_parse.urlsplit(self.path).query url_vars = urllib_parse.parse_qs(qs) oauth_token = url_vars['oauth_token'][0] oauth_verifier = url_vars['oauth_verifier'][0] if six.PY2: self.server.oauth_token = oauth_token.decode('utf-8') self.server.oauth_verifier = oauth_verifier.decode('utf-8') else: self.server.oauth_token = oauth_token self.server.oauth_verifier = oauth_verifier assert (isinstance(self.server.oauth_token, six.string_types)) assert (isinstance(self.server.oauth_verifier, six.string_types)) self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(html.auth_okay_html)
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "loop": raise SerialException( 'expected a string in the form ' '"loop://[?logging={debug|info|warning|error}]": not starting ' 'with loop:// ({!r})'.format(parts.scheme)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.loop') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: {!r}'.format(option)) except ValueError as e: raise SerialException( 'expected a string in the form ' '"loop://[?logging={debug|info|warning|error}]": {}'.format(e)) # - - - - - - - - - - - - - - - - - - - - - - - -
def _handle_solo_scout(self): """ Handles a solo scout :return: Path of scout image """ card = await self._scout_cards() # Send error message if no card was returned if not card: self.results = [] return None card = card[0] if card["card_image"] is None: url = "http:" + card["card_idolized_image"] else: url = "http:" + card["card_image"] fname = basename(urlsplit(url).path) image_path = idol_img_path.joinpath(fname) bytes_ = await get_one_img( url, image_path, self._bot.session_manager) return ScoutImage(bytes_, fname)
def determine_ftp_filename(host, furl) -> (bool, str): try: fsize = host.path.getsize(urlsplit(furl).path) fname = os.path.basename(urlsplit(furl).path) while True: if not os.path.exists(dlDir+fname): return True, fname # needDownload=True elif os.path.getsize(dlDir + fname) == fsize: # same name same size return False, fname # needDownload=False # same name different size, change name by appending "_1" ftitle, fext = os.path.splitext(fname) m = re.search(r'(.+)_(\d+)', ftitle) if m: ftitle = '%s_%s' % (m.group(1), int(m.group(2))+1) fname = ftitle + fext else: fname = ftitle + '_1' + fext except BaseException as ex: traceback.print_exc()
def test_submission_form_copy(self): '''Tests if a submissionform can be copied. Compares initial version against copied version. ''' submission_form = create_submission_form(presenter=self.user) response = self.client.get(reverse('ecs.core.views.submissions.copy_latest_submission_form', kwargs={'submission_pk': submission_form.submission.pk})) self.assertEqual(response.status_code, 302) url = reverse('ecs.core.views.submissions.copy_submission_form', kwargs={'submission_form_pk': submission_form.pk}) self.assertEqual(url, urlsplit(response['Location']).path) response = self.client.get(url) self.assertEqual(response.status_code, 302) target_url = response['Location'] response = self.client.get(target_url) self.assertEqual(response.status_code, 200) self.assertEqual(response.context['form'].initial.get('project_title'), submission_form.project_title)
def check_cors(origin): url=urlsplit(origin) if current_app.config.get('CORS_SECURE'): if url.scheme!='https': return False hp=url.netloc.split(':') host=hp[0] port=int(hp[1]) if len(hp)>1 else 443 if url.scheme == 'https' else 80 if current_app.config.get('CORS_HOSTS') != '*' and host not in current_app.config.get('CORS_HOSTS', []): return False allowed_ports=current_app.config.get('CORS_PORTS') if allowed_ports and isinstance(allowed_ports, tuple) and (port < allowed_ports[0] or\ port > allowed_ports[1]): return False elif allowed_ports and isinstance(allowed_ports, list) and port not in allowed_ports: return False return True
def append_query_params(self, url, **kwargs): uri = urlsplit(url) query = parse_qs(uri.query) for key in kwargs: if key in query: query[key].append(kwargs[key]) else: query[key] = kwargs[key] query_string = urlencode(query, doseq=True) uri_new = uri._replace(query=query_string) return urlunsplit(uri_new)
def test_get_without_verify_token(self): session = self.client.session session[SESSKEY_OAUTH_NEXT_URI] = "/test?type=set" session.save() getData = { "code": "code_will_not_be_checked_anyway" } res = self.client.get(reverse("instagram:connect"), getData) self.assertEqual(302, res.status_code) redirect_uri = urlsplit(res['Location']) self.assertEqual("/test", redirect_uri.path) self.assertDictEqual({"status": ["error"], "type": ["set", "api"], "detail": ["verify_token_not_set"]}, parse_qs(redirect_uri.query))
def test_get_with_no_verify_token_in_session(self): session = self.client.session session[SESSKEY_OAUTH_NEXT_URI] = "/test" session.save() getData = { "code": "code_will_not_be_checked_anyway", "verify_token": "token_will_not_be_checked_anyway" } res = self.client.get(reverse("instagram:connect"), getData) self.assertEqual(302, res.status_code) redirect_uri = urlsplit(res['Location']) self.assertEqual("/test", redirect_uri.path) self.assertDictEqual({"status": ["error"], "type": ["internal"], "detail": ["no_verify_token_in_session"]}, parse_qs(redirect_uri.query))
def test_get_with_valid_code_and_invalid_verify_token(self): sessionValue = "correctvalue" session = self.client.session session[SESSKEY_OAUTH_NEXT_URI] = "/test" session[SESSKEY_OAUTH_VERIFY_TOKEN] = sessionValue session.save() getData = { "verify_token": "someothervaluethaninthesession", "code": "code_will_not_be_checked_anyway" } res = self.client.get(reverse("instagram:connect"), getData) self.assertEqual(302, res.status_code) redirect_uri = urlsplit(res['Location']) self.assertEqual("/test", redirect_uri.path) self.assertDictEqual({"status": ["error"], "type": ["api"], "detail": ["invalid_verify_token"]}, parse_qs(redirect_uri.query))
def get_gramet_image_url(url_or_fp): img_src = '' if isinstance(url_or_fp, io.IOBase): # noinspection PyUnresolvedReferences data = url_or_fp.read() u = urlsplit(OGIMET_URL) else: u = urlsplit(url_or_fp) import requests r = requests.get(url_or_fp) data = r.text if data: m = re.search(r'<img src="([^"]+/gramet_[^"]+)"', data) if m: img_src = "{url.scheme}://{url.netloc}{path}".format( url=u, path=m.group(1)) return img_src
def test_adds_other_supplied_values_as_query_string(): app = Sanic('passes') @app.route(COMPLEX_PARAM_URL) def passes(): return text('this should pass') new_kwargs = dict(PASSING_KWARGS) new_kwargs['added_value_one'] = 'one' new_kwargs['added_value_two'] = 'two' url = app.url_for('passes', **new_kwargs) query = dict(parse_qsl(urlsplit(url).query)) assert query['added_value_one'] == 'one' assert query['added_value_two'] == 'two'
def connect(self): if self.connected: raise Exception("Already connected!") transport_class = self.TRANSPORTS[urlsplit(self.url).scheme] self.transport = transport_class( self.url, self.params, self._incoming, self._outgoing ) transport_task = asyncio.ensure_future(self.transport.run()) await self.transport.ready self._transport_task = transport_task self._done_recv = asyncio.Future() self._recv_task = asyncio.ensure_future(self._recv_loop()) # TODO: Ok, so this is cool - but how to tell if our transport_task has # failed. self.connected = True
def analyze_file(name, f, verbose=False): urls = [] Doc = namedtuple('Doc', ['item', 'min_hash']) documents = {} # key -> Doc lsh = MinHashLSH(threshold=0.9, num_perm=128) too_common = get_too_common_shingles(f, name, limit=300) for i, item in enumerate(item_reader(f, name)): urls.append(item['url']) min_hash = get_min_hash(item['extracted_text'], too_common) key = 'item_{}'.format(i) item = {'url': item['url']} documents[key] = Doc(item, min_hash) if key in lsh: lsh.remove(key) lsh.insert(key, min_hash) paths = [''.join([p.netloc, p.path]) for p in map(urlsplit, urls)] duplicates = get_duplicates(lsh, documents, verbose=verbose) print(name.ljust(40), '\t'.join(map(str, [ len(urls), len(set(urls)), len(set(paths)), n_unique(documents, duplicates), ])))
def media_request(self, url): kwargs = dict( url=url, priority=-2, meta={'download_slot': ( '{} documents'.format(urlsplit(url).netloc)), }, ) if using_splash(self.crawler.settings): return SplashRequest( endpoint='execute', args={'lua_source': self.lua_source}, slot_policy=SlotPolicy.SCRAPY_DEFAULT, **kwargs) else: return Request(**kwargs)
def sign_request(self, api_key, prepared_request): url = urlsplit(prepared_request.path_url) path = bytes(url.path, 'utf8') if url.query: path += bytes("?{}".format(url.query), 'utf8') salt = bytes(api_key, 'utf8') body = prepared_request.body or b"" if isinstance(body, str): body = bytes(body, 'utf8') signature = sha256(path + body + salt).hexdigest() prepared_request.headers["X-Signature"] = signature return prepared_request
def download_file(url, binary=True): if sys.version_info < (3,): from urlparse import urlsplit import urllib2 request = urllib2 error = urllib2 else: from urllib.parse import urlsplit from urllib import request, error filename = os.path.basename(urlsplit(url)[2]) data_dir = os.path.join(os.path.dirname(__file__), 'data') path = os.path.join(data_dir, filename) if os.path.exists(path): return path try: data = request.urlopen(url, timeout=15).read() with open(path, 'wb' if binary else 'w') as f: f.write(data) return path except error.URLError: msg = "could not download test file '{}'".format(url) warnings.warn(msg, RuntimeWarning) raise unittest.SkipTest(msg)
def merge_url_qs(url: str, **kw) -> str: """Merge the query string elements of a URL with the ones in ``kw``. If any query string element exists in ``url`` that also exists in ``kw``, replace it. :param url: An URL. :param kw: Dictionary with keyword arguments. :return: An URL with keyword arguments merged into the query string. """ segments = urlsplit(url) extra_qs = [ (k, v) for (k, v) in parse_qsl(segments.query, keep_blank_values=1) if k not in kw ] qs = urlencode(sorted(kw.items())) if extra_qs: qs += '&' + urlencode(extra_qs) return urlunsplit((segments.scheme, segments.netloc, segments.path, qs, segments.fragment))
def get_mgtv_real_url(url): """str->list of str Give you the real URLs.""" content = loads(get_content(url)) m3u_url = content['info'] split = urlsplit(m3u_url) base_url = "{scheme}://{netloc}{path}/".format(scheme = split[0], netloc = split[1], path = dirname(split[2])) content = get_content(content['info']) #get the REAL M3U url, maybe to be changed later? segment_list = [] segments_size = 0 for i in content.split(): if not i.startswith('#'): #not the best way, better we use the m3u8 package segment_list.append(base_url + i) # use ext-info for fast size calculate elif i.startswith('#EXT-MGTV-File-SIZE:'): segments_size += int(i[i.rfind(':')+1:]) return m3u_url, segments_size, segment_list
def serialize(url='', data={}): """ Returns a URL with a query string of the given data. """ p = urlparse.urlsplit(url) q = urlparse.parse_qsl(p.query) q.extend((b(k), b(v)) for k, v in sorted(data.items())) q = urlencode(q, doseq=True) p = p.scheme, p.netloc, p.path, q, p.fragment s = urlparse.urlunsplit(p) s = s.lstrip('?') return s # print(serialize('http://www.google.com', {'q': 'cats'})) # http://www.google.com?q=cats #---- REQUESTS & STREAMS -------------------------------------------------------------------------- # The download(url) function returns the HTML (JSON, image data, ...) at the given url. # If this fails it will raise NotFound (404), Forbidden (403) or TooManyRequests (420).
def safe_url(url, remove_empty_query=True): scheme, netloc, path, query, fragment = urlsplit(url) if not query: return url.rstrip('/') # Sort all the queries queries = [] for q in query.split('&'): if '=' not in q: return url key, value = q.split('=') if remove_empty_query and not value: continue queries.append((key, value)) queries.sort(key=lambda x: x[0]) query = urlencode(queries) return urlunsplit((scheme, netloc, path, query, fragment)).rstrip('/')
def __call__(self, value): try: super(URLValidator, self).__call__(value) except ValidationError as e: # Trivial case failed. Try for possible IDN domain if value: value = text_type(value) scheme, netloc, path, query, fragment = urlsplit(value) try: # IDN -> ACE netloc = netloc.encode('idna').decode('ascii') except UnicodeError: # invalid domain part raise ValidationError(self.message.format(value), code=self.code) url = urlunsplit((scheme, netloc, path, query, fragment)) return super(URLValidator, self).__call__(url) else: raise ValidationError(self.message.format(value), code=self.code) return value
def test_batch_softmax_high_prob(server, priority=10000): q = make_queue(server, BatchSoftmaxQueue, settings={'QUEUE_BATCH_SIZE': 50}) for domain_n in range(100): for url_n in range(5): q.push(Request( url='http://domain-{}.com/{}'.format(domain_n, url_n), priority=priority if (domain_n in [42, 43] and url_n == 1) else 0, )) res = q.pop_multi() urls = {r.url for r in res} assert 'http://domain-42.com/1' in urls assert 'http://domain-43.com/1' in urls assert len({urlsplit(r.url).netloc for r in res}) > 10 assert len(res) == 50 # FIXME - broken in ebd4cb651050fcdae5427383f3d07b094f853155 # TODO - add a test for the infinite loop fixed in ^^