我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.urljoin()。
def parse(self, response): """ 1. ???????????url???scrapy???????? 2. ??????url???scrapy????? ???????parse """ # ???????????url???scrapy???????? if response.status == 404: self.fail_urls.append(response.url) self.crawler.stats.inc_value("failed_url") #?extra?list???????? post_nodes = response.css("#archive .floated-thumb .post-thumb a") for post_node in post_nodes: #??????url image_url = post_node.css("img::attr(src)").extract_first("") post_url = post_node.css("::attr(href)").extract_first("") #request?????????parse_detail?????????? # Request(url=post_url,callback=self.parse_detail) yield Request(url=parse.urljoin(response.url, post_url), meta={"front_image_url": image_url}, callback=self.parse_detail) #??href????????? #response.url + post_url print(post_url) # ????????scrapy???? next_url = response.css(".next.page-numbers::attr(href)").extract_first("") if next_url: yield Request(url=parse.urljoin(response.url, next_url), callback=self.parse)
def getCsvReport(product_list, startdate, enddate, source_obj): print print ("Requesting a csv report for the given time period") headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8','Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'} path = "/billing-usage/v1/contractUsageData/csv" parameters = { "reportSources" :[source_obj], "products" :product_list, "startDate" :startdate, "endDate" :enddate } print data_string = parse.urlencode({p: json.dumps(parameters[p]) for p in parameters}) products_result = session.post(parse.urljoin(baseurl,path),data=data_string, headers=headers) products_csv = products_result.text return products_csv
def get_sendgrid_request_message(cfg, keyid, hex, user_email): url_prefix = urljoin( cfg.config.megserver_hostname_url, os.path.join(cfg.config.meg_url_prefix, "revoke") ) params = urlencode([("keyid", keyid), ("token", hex)]) parsed = list(urlparse(url_prefix)) parsed[4] = params revocation_link = urlunparse(parsed) message = Mail() message.add_to(user_email) message.set_from(cfg.config.sendgrid.from_email) message.set_subject(cfg.config.sendgrid.subject) message.set_html(EMAIL_HTML.format(keyid=keyid, link=revocation_link)) return message
def _setup_token(self): self.logger.warn("\n\n(One Time Setup) Please create a Personal Access Token") self.logger.warn("https://%s/profile/personal_access_tokens" % self.origin_domain) self.logger.warn("Scope: API, Expires: Never\n") token = input("Please enter your Personal Access Token: ") # Make request to resource that requires us to be authenticated path = 'projects/%s/labels' % self._url_encoded_path() url = urljoin(str(self._API()), path) res = requests.get( url, headers={"PRIVATE-TOKEN": token} ) if res.status_code == 200: return(token, None) return(-1, "Invalid Personal Access Token")
def url_for( text, font, color, back_color, size_fixed = False, align = 'center', stretch = True ): base_url = app.config['SITE_BASE_URL'] payload = { 'text': text, 'font': font, 'color': color, 'back_color': back_color, 'size_fixed': str(size_fixed).lower(), 'align': align, 'stretch': str(stretch).lower() } return urljoin(base_url, 'emoji') + '?' + urlencode(payload)
def spaces_page(context): """Go to the Spaces page with list of available Spaces.""" print("Spaces page") url = urljoin(context.server, context.username + "/_spaces") context.browser.visit(url) space_names = get_all_existing_space_names(context.browser) new_space_name = generate_unique_space_name(space_names) context.space_name = new_space_name print("Unique name for new Space\n " + new_space_name) create_new_space_step_1(context) create_new_space_step_2(context) create_new_space_step_3(context) create_new_space_step_4(context) create_new_space_step_5(context) create_new_space_step_6(context) create_new_space_step_7(context)
def stack_reccomendation_on_pipepines_page(context): """Check the presence of stack recommendation on the Pipelines page.""" url = urljoin(context.server, context.username + "/" + context.space_name + "/create/pipelines") print("Going to the pipeline page for the Space {s}".format( s=context.space_name)) context.browser.visit(url) time.sleep(SLEEP_BEFORE_CLICK) check_text_presence(context, "Stack Reports") link = context.browser.find_by_text("Stack Reports") link.click() time.sleep(SLEEP_BETWEEN_PAGES) # TODO - ask why the text is different: Recommendation/Recommended recommendation1 = 'Recommended - Change io.vertx:vertx-web : 3.4.1' check_text_presence(context, recommendation1) recommendation2 = 'Recommended - Change io.vertx:vertx-core : 3.4.1' check_text_presence(context, recommendation2) time.sleep(SLEEP_BETWEEN_PAGES)
def _normalize_name(self, name): """ Normalizes the name so that paths like /path/to/ignored/../foo.txt work. We check to make sure that the path pointed to is not outside the directory specified by the LOCATION setting. """ base_path = force_text(self.location) base_path = base_path.rstrip('/') final_path = urljoin(base_path.rstrip('/') + "/", name) base_path_len = len(base_path) if (not final_path.startswith(base_path) or final_path[base_path_len:base_path_len + 1] not in ('', '/')): raise SuspiciousOperation("Attempted access to '%s' denied." % name) return final_path.lstrip('/')
def _prepare_api_url(self, index_url): # pragma: nocover if not index_url.endswith('/'): index_url += '/' if index_url.endswith('/simple/'): self.PYPI_API_TYPE = 'simple_html' return urljoin(index_url, '{package}') if index_url.endswith('/+simple/'): self.PYPI_API_TYPE = 'simple_html' return urljoin(index_url, '{package}') if '/pypi/' in index_url: base_url = index_url.split('/pypi/')[0] return urljoin(base_url, '/pypi/{package}/json') return urljoin(index_url, '/pypi/{package}/json')
def parse_lista_diputados(response): tree = fromstring(response.content) # listado de diputados diputados = tree.xpath('//div[@class="listado_1"]/ul/li/a/@href') for diputado in diputados: diputado_url = urljoin(response.url, diputado) response = requests.get(diputado_url) parse_diputado(response) # proxima pagina pagina_siguiente = tree.xpath('//a[contains(., "Página Siguiente")]/@href') if pagina_siguiente: pagina_siguiente_url = pagina_siguiente[0] response = requests.get(pagina_siguiente_url) parse_lista_diputados(response)
def get_file_urls(mainUrl,extension): uniFileUrls = [] if not mainUrl.lower().startswith('http://') and not mainUrl.lower().startswith('https://'): mainUrl = 'http://%s'%mainUrl print('Downloading from %s...'%mainUrl) if extension.startswith('*'): extension = extension[1:] if not extension.startswith('.'): extension = '.' + extension req = urllib.request.Request( mainUrl, data=None, headers={ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36' } ) urlContent = urllib.request.urlopen(req).read().decode('utf-8') html = lxml.html.fromstring(urlContent) urls = html.xpath('//a/@href') for url in urls: if url.endswith(extension): url = urljoin(mainUrl,url) if url not in uniFileUrls: uniFileUrls.append(url) return uniFileUrls
def parse(self, response): ''' 1.????????????url,???scrapy?????????? 2.??????url???scrapy?????????????parse :param response: :return: ''' #???????????url????scrapy??????? post_nodes = response.css("#archive .floated-thumb .post-thumb a") for post_node in post_nodes: #image_url?????? image_url = post_node.css("img::attr(src)").extract_first("") post_url = post_node.css("::attr(href)").extract_first("") #????meta??????url????????parse.urljoin?????????????response.url??? # ???????response.url?post_url??? yield Request(url=parse.urljoin(response.url,post_url),meta={"front_image_url":parse.urljoin(response.url,image_url)},callback=self.parse_detail) #????????scrapy?? next_url = response.css(".next.page-numbers::attr(href)").extract_first("") if next_url: yield Request(url=next_url,callback=self.parse)
def create_request(self, method, path, options): """Creating a request with the given arguments If api_version is set, appends it immediately after host """ version = '/' + options['api_version'] if 'api_version' in options else '' # Adds a suffix (ex: ".html", ".json") to url suffix = options['response_type'] if 'response_type' in options else 'json' path = path + '.' + suffix path = urlparse.urljoin(self.base, version + path) if 'api_version' in options: del options['api_version'] if 'response_type' in options: del options['response_type'] return requests.request(method, path, **options)
def _resolve_version(version): """ Resolve LATEST version """ if version is not LATEST: return version meta_url = urljoin(DEFAULT_URL, '/pypi/setuptools/json') resp = urlopen(meta_url) with contextlib.closing(resp): try: charset = resp.info().get_content_charset() except Exception: # Python 2 compat; assume UTF-8 charset = 'UTF-8' reader = codecs.getreader(charset) doc = json.load(reader(resp)) return str(doc['info']['version'])
def _relocate (self, response, newloc): if len (self._history) > 5: raise RuntimeError ("Maximum Redirects Reached") if response.status_code in (301, 302): if self.get_method () in ("POST", "PUT"): self.payload = b"" self.set_content_length (0) self.remove_header ('content-type') self.remove_header ('content-length') self.remove_header ('transfer-encoding') self.remove_header ('content-encoding') self.method = "GET" if not newloc: newloc = response.get_header ('location') self.uri = urljoin (self.uri, newloc) self.address, self.path = self.split (self.uri) self.add_history (response)
def get_certificate_url(mmtrack, course): """ Find certificate associated with highest passing grade for the course Args: mmtrack (dashboard.utils.MMTrack): a instance of all user information about a program course (courses.models.Course): A course Returns: str: url to view the certificate """ url = "" final_grades = mmtrack.get_passing_final_grades_for_course(course) if final_grades.exists(): best_grade = final_grades.first() course_key = best_grade.course_run.edx_course_key if mmtrack.financial_aid_available: if best_grade.has_certificate and course.signatories.exists(): url = reverse('certificate', args=[best_grade.certificate.hash]) elif mmtrack.has_passing_certificate(course_key): download_url = mmtrack.certificates.get_verified_cert(course_key).download_url if download_url: url = urljoin(settings.EDXORG_BASE_URL, download_url) return url
def test_url_with_course_key(self): """Test course url with a course key and no enrollment_url""" course_run = CourseRunFactory.create( course=self.course, start_date=self.from_weeks(-1), end_date=None, enrollment_start=self.from_weeks(-1), enrollment_end=None, enrollment_url=None, edx_course_key="course_key" ) expected = urljoin( BASE_URL, 'courses/{key}/about'.format(key=course_run.edx_course_key) ) assert course_run.course.url == expected
def get_links_from_url(url): """Download the page at `url` and parse it for links. Returned links have had the fragment after `#` removed, and have been made absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes 'http://www.tornadoweb.org/en/stable/gen.html'. """ try: response = yield httpclient.AsyncHTTPClient().fetch(url) print('fetched %s' % url) html = response.body if isinstance(response.body, str) \ else response.body.decode() urls = [urljoin(url, remove_fragment(new_url)) for new_url in get_links(html)] except Exception as e: print('Exception: %s %s' % (e, url)) raise gen.Return([]) raise gen.Return(urls)
def compare_by_title(self, title: str) -> bool: r = requests.get(urljoin(constants.main_url, 'search/') + quote(title), headers=self.settings.requests_headers, timeout=self.settings.timeout_timer) r.encoding = 'utf-8' soup_1 = BeautifulSoup(r.text, 'html.parser') matches_links = set() # content-row manga row for gallery in soup_1.find_all("div", class_=re.compile("content-row")): link_container = gallery.find("a", class_="content-title") if link_container: matches_links.add(urljoin(constants.main_url, link_container['href'])) self.gallery_links = list(matches_links) if len(self.gallery_links) > 0: self.found_by = self.name return True else: return False
def api_get(self, url, *args, **kwargs): """Gets an object from factorio mod api.""" url = urljoin(FACTORIO_BASEURL, url) assert url.startswith(urljoin(FACTORIO_BASEURL, "/api")), "Only factorio mods api can be used with this" qparams = "" if "params" in kwargs: qparams = "&".join(key + "=" + str(kwargs["params"][key]) for key in sorted(kwargs["params"].keys())) cache_key = urlparse(url).path + "?" + qparams res = self.fetch(cache_key) if res: return json.loads(res) data = self.get(url, *args, **kwargs).text # check and minify data try: data = json.dumps(json.loads(data), separators=(',',':')) except json.decoder.JSONDecodeError: exit("Invalid JSON data in cache") self.store(cache_key, data) return json.loads(data)
def get_links(etree, page_url): """ page_url: the url of the page parsed in the etree """ _is_etree(etree) links = [urljoin(page_url, i) for i in etree.xpath('//a/@href')] localhost = urlparse(page_url).hostname internal = set() external = set() for link in links: if not urlparse(link).hostname == localhost: internal.add(link) else: external.add(link) return {'internal': list(internal), 'external': list(external)}
def parse(self, response): """ ???html??????url ?????url?????? ?????url???? /question/xxx ????????????? """ all_urls = response.css("a::attr(href)").extract() all_urls = [parse.urljoin(response.url, url) for url in all_urls] # ??lambda???????url????????true???????false??? all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls) for url in all_urls: match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url) if match_obj: # ?????question??????????????????? request_url = match_obj.group(1) yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question) #?? # break else: # pass # ????question?????????? yield scrapy.Request(url, headers=self.headers, callback=self.parse)
def dns_resolve(self, domain): payload = {'hostnames': [domain], 'key': self.api_key} r = requests.get(urljoin(self.base_url, 'dns/resolve'), params=payload) if r.status_code == requests.codes.ok: return r.json()
def reverse_dns(self, ip): payload = {'ips': [ip], 'key': self.api_key} r = requests.get(urljoin(self.base_url, 'dns/reverse'), params=payload) if r.status_code == requests.codes.ok: result = r.json() if not result[ip]: result[ip] = [] return result
def _request(self, method, params): url = urljoin(SlackClient.BASE_URL, method) data = {'token': self.token} if self.bot_name: data['username'] = self.bot_name params.update(data) return requests.post( url, data=params, headers={'content-type': 'application/x-www-form-urlencoded'} )
def _join_url(self, path): if path.startswith('/'): path = path[1:] return urljoin(ZoomClient.BASE_URL, path)
def get_url(self, routename, **kargs): """ Return a string that matches a named route """ scriptname = request.environ.get('SCRIPT_NAME', '').strip('/') + '/' location = self.router.build(routename, **kargs).lstrip('/') return urljoin(urljoin('/', scriptname), location)
def fullpath(self): """ Request path including :attr:`script_name` (if present). """ return urljoin(self.script_name, self.path.lstrip('/'))
def redirect(url, code=None): """ Aborts execution and causes a 303 or 302 redirect, depending on the HTTP protocol version. """ if not code: code = 303 if request.get('SERVER_PROTOCOL') == "HTTP/1.1" else 302 res = response.copy(cls=HTTPResponse) res.status = code res.body = "" res.set_header('Location', urljoin(request.url, url)) raise res
def url(self, filename): full_path = op.join(self.name, self.version, filename) return urljoin(DEFAULT_BASE_URL, full_path)
def url(self, filename): full_path = op.join(self.name, self.version, filename) return urljoin(self.base_url, full_path)
def is_safe_url(target): ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and \ ref_url.netloc == test_url.netloc
def make_call(self, method, endpoint, headers=None, payload=None, params=None): """ Make an authenticated synchronous HTTP call to the Emarsys api using the requests library. :param method: HTTP method. :param endpoint: Emarsys' api endpoint. :param headers: HTTP headers. :param payload: HTTP payload. :param params: HTTP params. :return: Dictionary with the result of the query. """ if not payload: payload = {} if not params: params = {} url = urljoin(self.uri, endpoint) headers = self.build_headers(headers) response = requests.request( method, url, headers=headers, json=payload, params=params ) try: response.raise_for_status() except requests.exceptions.HTTPError as err: raise ApiCallError( 'Error message: "{}" \n Error details: "{}"'.format( err, response.text ) ) return response.json()
def getProducts(parameter_obj,startdate,enddate): print print ("Requesting a list of products for the given time period") headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8','Accept':'application/json'} path = "/billing-usage/v1/products" parameters = { "reportSources" :parameter_obj, "startDate" :startdate, "endDate" :enddate } data_string = encoder.urlencode({p: json.dumps(parameters[p]) for p in parameters}) products_result = session.post(parse.urljoin(baseurl,path),data=data_string, headers=headers) products_obj = json.loads(products_result.text) return products_obj['contents']
def urlJoin(self, url, path): return parse.urljoin(url, path)
def getResult(self, endpoint, parameters=None): path = endpoint endpoint_result = self.session.get(parse.urljoin(self.baseurl,path), params=parameters) if self.verbose: print (">>>\n" + json.dumps(endpoint_result.json(), indent=2) + "\n<<<\n") status = endpoint_result.status_code if self.verbose: print( "LOG: GET %s %s %s" % (endpoint,status,endpoint_result.headers["content-type"])) self.httpErrors(endpoint_result.status_code, path, endpoint_result.json()) return endpoint_result.json()
def postResult(self, endpoint, body, parameters=None): headers = {'content-type': 'application/json'} path = endpoint endpoint_result = self.session.post(parse.urljoin(self.baseurl,path), data=body, headers=headers, params=parameters) status = endpoint_result.status_code if self.verbose: print ("LOG: POST %s %s %s" % (path,status,endpoint_result.headers["content-type"])) if status == 204: return {} self.httpErrors(endpoint_result.status_code, path, endpoint_result.json()) if self.verbose: print (">>>\n" + json.dumps(endpoint_result.json(), indent=2) + "\n<<<\n") return endpoint_result.json()
def putResult(self, endpoint, body, parameters=None): headers = {'content-type': 'application/json'} path = endpoint endpoint_result = self.session.put(parse.urljoin(self.baseurl,path), data=body, headers=headers, params=parameters) status = endpoint_result.status_code if self.verbose: print ("LOG: PUT %s %s %s" % (endpoint,status,endpoint_result.headers["content-type"])) if status == 204: return {} if self.verbose: print (">>>\n" + json.dumps(endpoint_result.json(), indent=2) + "\n<<<\n") return endpoint_result.json()
def deleteResult(self, endpoint): endpoint_result = self.session.delete(parse.urljoin(self.baseurl,endpoint)) status = endpoint_result.status_code if self.verbose: print ("LOG: DELETE %s %s %s" % (endpoint,status,endpoint_result.headers["content-type"])) if status == 204: return {} if self.verbose: print (">>>\n" + json.dumps(endpoint_result.json(), indent=2) + "\n<<<\n") return endpoint_result.json()
def run(filename, config): API_URL = "https://api.github.com/" asset_name = os.path.split(filename)[-1] content_type = guess_type(asset_name)[0] or "application/zip" creds = (config["username"], config["token"]) release_info_url = urljoin(API_URL, "/repos/{}/releases/tags/{}".format( config["repo"], config["release_tag"])) # get release info try: resp = requests.get(release_info_url, auth=creds) except: raise TransportException("Failed to connect to GitHub API") if resp.status_code is not 200: raise TransportException("Check your GitHub API auth settings") # delete old asset for x in resp.json()["assets"]: if x["name"] == asset_name: r = requests.delete(x["url"], auth=creds) if r.status_code is not 204: raise TransportException("Failed to delete asset from GitHub") # upload new asset upload_url = resp.json()["upload_url"].split("{")[0] # wat headers = {'Content-Type': content_type} params = {'name': asset_name} data = open(filename, 'rb').read() r = requests.post(upload_url, headers=headers, params=params, auth=creds, data=data) if r.status_code is not 201: raise TransportException("Failed to upload asset to GitHub API")
def urlopen(self, method, url, redirect=True, **kw): """ Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen` with custom cross-host redirect logic and only sends the request-uri portion of the ``url``. The given ``url`` parameter must be absolute, such that an appropriate :class:`urllib3.connectionpool.ConnectionPool` can be chosen for it. """ u = parse_url(url) conn = self.connection_from_host(u.host, port=u.port, scheme=u.scheme) kw['assert_same_host'] = False kw['redirect'] = False if 'headers' not in kw: kw['headers'] = self.headers if self.proxy is not None and u.scheme == "http": response = conn.urlopen(method, url, **kw) else: response = conn.urlopen(method, u.request_uri, **kw) redirect_location = redirect and response.get_redirect_location() if not redirect_location: return response # Support relative URLs for redirecting. redirect_location = urljoin(url, redirect_location) # RFC 2616, Section 10.3.4 if response.status == 303: method = 'GET' log.info("Redirecting %s -> %s" % (url, redirect_location)) kw['retries'] = kw.get('retries', 3) - 1 # Persist retries countdown kw['redirect'] = redirect return self.urlopen(method, redirect_location, **kw)
def combine_urls(self, parent_url, child_url): return urljoin(parent_url, child_url)
def parse(self, res): # ?html???etree etree = self.e_html(res.html) # ?????????url pages = list(set(i.get('href') for i in etree.cssselect('li.pb_list_pager>a'))) pages.append(self.start_urls[0]) for page in pages: url = urljoin(self.start_urls[0], page) yield Request(url, headers=self.headers, callback=self.parse_item)
def _API(self): url = 'https://%s/' % self.origin_domain path = 'api/v4/' return urljoin(str(url), str(path))
def parent_branch_exists(self, token): path = '/repos/%s/%s/branches/%s' % (self.namespace, self.project, self.parent_branch) url = urljoin(str(self.API), str(path)) try: res = requests.get(url) except Exception as e: self.logger.fatal(e) sys.exit() if res.status_code == 404: return False return True
def parse(self, response): #??????????? custom_district = response.xpath('//div[@id="tmenu_126614"]/table/tr[5]//a/@href').extract()[0] if custom_district: self.log('custom_district: %s' % custom_district) yield scrapy.Request(url=custom_district, callback=self.get_custom_district) ''' # ????????????? monthly_magazine = response.xpath('//div[@id="tmenu_126614"]/table/tr[2]//a/@href').extract()[0] if monthly_magazine: monthly_magazine_url = urljoin('http://www.customs.gov.cn', str(monthly_magazine)) self.log('monthly_magazine: %s' % monthly_magazine_url) yield scrapy.Request(url=monthly_magazine_url, callback=self.get_monthly_magazine) #???????? pages_text = response.xpath('//div[@id="ess_ctr175903_ListC_Info_AspNetPager"]/table/tr/td[1]//text()').extract() pages_count = str(pages_text[2]).split('/')[1] print(pages_count) i = 1 while i <= int(pages_count): print(i) page_url = urljoin('http://www.customs.gov.cn', '/publish/portal0/tab49666/module175903/page%d.htm' % i) self.log('page_url: %s' % page_url) i += 1 yield scrapy.Request(url=page_url, callback=self.get_flash) '''
def get_flash(self, response): # ????????? page_list = response.xpath('//li[@class="liebiaoys24"]/span/a/@href').extract() for page in page_list: if 'http' in page: self.log("flash_url: %s" % str(page)) yield scrapy.Request(url=str(page), callback=self.parse_flash) else: flash_url = urljoin('http://www.customs.gov.cn', str(page)) self.log("flash_url: %s" % flash_url) # ???????????parse_flash???????? yield scrapy.Request(url=flash_url, callback=self.parse_flash)