我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib3.PoolManager()。
def __init__(self, headers=None, retries=None, validate_certificate=True, urlfetch_retries=True): if not urlfetch: raise AppEnginePlatformError( "URLFetch is not available in this environment.") if is_prod_appengine_mvms(): raise AppEnginePlatformError( "Use normal urllib3.PoolManager instead of AppEngineManager" "on Managed VMs, as using URLFetch is not necessary in " "this environment.") warnings.warn( "urllib3 is using URLFetch on Google App Engine sandbox instead " "of sockets. To use sockets directly instead of URLFetch see " "https://urllib3.readthedocs.io/en/latest/reference/urllib3.contrib.html.", AppEnginePlatformWarning) RequestMethods.__init__(self, headers) self.validate_certificate = validate_certificate self.urlfetch_retries = urlfetch_retries self.retries = retries or Retry.DEFAULT
def ensure_dataset_exists(files, dirname): path = os.path.join("data", dirname) rv = [os.path.join(path, f) for f in files] logger.info("Retrieving dataset from {}".format(path)) if not os.path.exists(path): # Extract or download data try: os.makedirs(path) except OSError as exception: if exception.errno != errno.EEXIST: raise for f, file_path in zip(files, rv): data_url = BASE_URL + dirname + "/" + f if not os.path.exists(file_path): logger.warn("Downloading {}".format(data_url)) with urllib3.PoolManager().request('GET', data_url, preload_content=False) as r, \ open(file_path, 'wb') as w: shutil.copyfileobj(r, w) return rv # Convert data into a stream of never-ending data
def fetch_service_config_rollout_strategy(metadata): """Fetch service config rollout strategy from metadata URL.""" url = metadata + _METADATA_PATH + "/attributes/" + \ _METADATA_ROLLOUT_STRATEGY headers = {"Metadata-Flavor": "Google"} client = urllib3.PoolManager(ca_certs=certifi.where()) try: response = client.request("GET", url, headers=headers) except: logging.info("Failed to fetch service config rollout strategy " + \ "from the metadata server: " + url); return None status_code = response.status if status_code != 200: # Fetching rollout strategy is optional. No need to leave log return None rollout_strategy = response.data logging.info("Service config rollout strategy: " + rollout_strategy) return rollout_strategy
def fetch_service_name(metadata): """Fetch service name from metadata URL.""" url = metadata + _METADATA_PATH + "/attributes/" + _METADATA_SERVICE_NAME headers = {"Metadata-Flavor": "Google"} client = urllib3.PoolManager(ca_certs=certifi.where()) try: response = client.request("GET", url, headers=headers) except: raise FetchError(1, "Failed to fetch service name from the metadata server: " + url) status_code = response.status if status_code != 200: message_template = "Fetching service name failed (url {}, status code {})" raise FetchError(1, message_template.format(url, status_code)) name = response.data logging.info("Service name: " + name) return name # config_id from metadata is optional. Returns None instead of raising error
def fetch_access_token(metadata): """Fetch access token from metadata URL.""" access_token_url = metadata + _METADATA_PATH + "/service-accounts/default/token" headers = {"Metadata-Flavor": "Google"} client = urllib3.PoolManager(ca_certs=certifi.where()) try: response = client.request("GET", access_token_url, headers=headers) except: raise FetchError(1, "Failed to fetch access token from the metadata server: " + access_token_url) status_code = response.status if status_code != 200: message_template = "Fetching access token failed (url {}, status code {})" raise FetchError(1, message_template.format(access_token_url, status_code)) token = json.loads(response.data)["access_token"] return token
def do_http_request(self, start_path='', send_counts=None): statuses = self.load_balancers_client.show_load_balancer_status_tree( self.loadbalancer['id']) statuses = statuses.get('statuses', statuses) self.http_cnt = {} http = urllib3.PoolManager(retries=10) send_counts = send_counts or self.poke_counters send_counts = (send_counts * 2) / 2 url_path = "http://{0}/{1}".format(self.vip_ip_address, start_path) for x in range(send_counts): resp = http.request('GET', url_path) if resp.status == 200: self.count_response(resp.data.strip()) else: self.count_response(str(resp.status)) return self.http_cnt
def __init__(self) -> None: http_client = urllib3.PoolManager( timeout=urllib3.Timeout.DEFAULT_TIMEOUT, cert_reqs='CERT_REQUIRED', ca_certs=certifi.where(), retries=urllib3.Retry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504] ), maxsize=20 ) self.client = minio.Minio( S3_SERVER, access_key=S3_ACCESS_KEY, secret_key=S3_SECRET_KEY, region=S3_REGION, secure=S3_SERVER == 's3.amazonaws.com', http_client=http_client )
def mdn_cmd(self, listener, sender, target, args): arg_list = args.split() if len(arg_list) < 1: self.messenger.msg(target, "https://developer.mozilla.org/ - Mozilla Developer Network") return http = urllib3.PoolManager() base_url = "https://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=" mdn_url = "%20site%3Adeveloper.mozilla.org" full_url = base_url+args+mdn_url o = urlparse(full_url) r = http.request("GET", o.geturl()) pd = json.loads(r.data.decode('UTF-8')) firsturl = pd['responseData']['results'][0]['url'] self.messenger.msg(target, firsturl)
def getWashPostText(url, token): # this function takes the url of article and returns article # minus the crud - HTML, jabascript etc. try: page = urllib3.PoolManager.urlopen(url).read().decode('utf8') except: # here we say if unable to dload url, return title=None;article=None return (None, None) soup = BeautifulSoup(page) if soup is None: return (None, None) # we say here, the error checks are succesful, page was parsed text = "" if soup.find_all(token) is not None: # here we search the page for tokens which demarcate the article # usually '<article></article>' text = ''.join(map(lambda p: p.text, soup.find_all(token))) soup2 = BeautifulSoup(text) if soup2.find_all('p') is not None: text = ''.join(map(lambda p: p.text, soup2.find_all('p'))) return text, soup.title.text
def scrapeSource(url, magicFrag='2017', scraperFunction=getNYTText, token='None'): urlBodies = {} requests = urllib3.PoolManager() response = requests.request('GET', url) soup = BeautifulSoup(response.data) # the above lines of code sets up the beautifulSoup page # now we find links # links are always of the form <a href='url'> link-text </a> for a in soup.findAll('a'): try: # the line above refers to indiv. scrapperFunction # for NYT & washPost if body and len(body) > 0: urlBodies[url] = body print(url) except: numErrors = 0 numErrors += 1
def __getPage(self,url): http = urllib3.PoolManager( cert_reqs='CERT_REQUIRED', # Force certificate check. ca_certs=certifi.where(), # Path to the Certifi bundle. ) data = '' try: data = http.request('GET', url, timeout=10, headers={ 'User-agent' : 'Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.5) Gecko/20091102 Firefox/3.5.5'} ).data codeType = chardet.detect(data) data = data.decode(codeType['encoding']) except: pass return data
def get_e2e_configuration(): config = Configuration() config.host = None if os.path.exists( os.path.expanduser(kube_config.KUBE_CONFIG_DEFAULT_LOCATION)): kube_config.load_kube_config(client_configuration=config) else: print('Unable to load config from %s' % kube_config.KUBE_CONFIG_DEFAULT_LOCATION) for url in ['https://%s:8443' % DEFAULT_E2E_HOST, 'http://%s:8080' % DEFAULT_E2E_HOST]: try: urllib3.PoolManager().request('GET', url) config.host = url config.verify_ssl = False urllib3.disable_warnings() break except urllib3.exceptions.HTTPError: pass if config.host is None: raise unittest.SkipTest('Unable to find a running Kubernetes instance') print('Running test against : %s' % config.host) config.assert_hostname = False return config
def __init__(self, headers=None, retries=None, validate_certificate=True, urlfetch_retries=True): if not urlfetch: raise AppEnginePlatformError( "URLFetch is not available in this environment.") if is_prod_appengine_mvms(): raise AppEnginePlatformError( "Use normal urllib3.PoolManager instead of AppEngineManager" "on Managed VMs, as using URLFetch is not necessary in " "this environment.") warnings.warn( "urllib3 is using URLFetch on Google App Engine sandbox instead " "of sockets. To use sockets directly instead of URLFetch see " "https://urllib3.readthedocs.io/en/latest/contrib.html.", AppEnginePlatformWarning) RequestMethods.__init__(self, headers) self.validate_certificate = validate_certificate self.urlfetch_retries = urlfetch_retries self.retries = retries or Retry.DEFAULT
def get(self, url, ip, timeoutConnect, timeoutRead): headers = { 'CONTENT-TYPE': 'text/xml; charset="utf-8"', 'USER-AGENT': 'uPNP/1.0' } try: timeout = urllib3.util.timeout.Timeout(connect=timeoutConnect, read=timeoutRead) http = urllib3.PoolManager(timeout=timeout) r = http.request("GET", url, headers=headers) if r.status == 200: self.found_ip = ip print("found "+ip) self.kill_after_found() except Exception as e: pass #print("Request for '%s' failed: %s" % (url, e))
def do_get(self, url, top_level=False, top_level_path=""): parts = list(urlparse.urlparse(url)) # 2 is the path offset if top_level: parts[2] = '/' + top_level_path parts[2] = MULTIPLE_SLASH.sub('/', parts[2]) url = urlparse.urlunparse(parts) try: if self.disable_ssl_validation: urllib3.disable_warnings() http = urllib3.PoolManager(cert_reqs='CERT_NONE') else: http = urllib3.PoolManager() r = http.request('GET', url, headers=self.headers) except Exception as e: LOG.error("Request on service '%s' with url '%s' failed", (self.name, url)) raise e if r.status >= 400: raise ServiceError("Request on service '%s' with url '%s' failed" " with code %d" % (self.name, url, r.status)) return r.data
def main(): if len(sys.argv) != 3: print("Usage:\n\n\tpython preprocess_spec.py kuberneres_branch " \ "output_spec_path") return 1 spec_url = 'https://raw.githubusercontent.com/kubernetes/kubernetes/' \ '%s/api/openapi-spec/swagger.json' % sys.argv[1] output_path = sys.argv[2] pool = urllib3.PoolManager() with pool.request('GET', spec_url, preload_content=False) as response: if response.status != 200: print("Error downloading spec file. Reason: %s" % response.reason) return 1 in_spec = json.load(response, object_pairs_hook=OrderedDict) out_spec = process_swagger(in_spec) with open(output_path, 'w') as out: json.dump(out_spec, out, sort_keys=False, indent=2, separators=(',', ': '), ensure_ascii=True) return 0
def openshift_main(): import sys import json import codecs import urllib3 from collections import OrderedDict pool = urllib3.PoolManager() reader = codecs.getreader('utf-8') spec_url = 'https://raw.githubusercontent.com/openshift/origin/' \ '%s/api/swagger-spec/openshift-openapi-spec.json' % sys.argv[1] output_path = sys.argv[2] print("writing to {}".format(output_path)) with pool.request('GET', spec_url, preload_content=False) as response: if response.status != 200: print("Error downloading spec file. Reason: %s" % response.reason) return 1 in_spec = json.load(reader(response), object_pairs_hook=OrderedDict) out_spec = process_swagger(process_openshift_swagger(in_spec, output_path)) update_codegen_ignore(out_spec, output_path) with open(output_path, 'w') as out: json.dump(out_spec, out, sort_keys=True, indent=2, separators=(',', ': '), ensure_ascii=True) return 0
def mercatox(): http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where()) url = 'https://mercatox.com/public/json24' #response = http.request('GET', url, headers=header, timeout=20.0) response = http.request('GET', url, timeout=20.0) json_mercatox = json.loads(response.data) json_array = json_mercatox['pairs']['XRB_BTC'] try: last_price = int(float(json_array['last']) * (10 ** 8)) except KeyError: last_price = 0 high_price = int(float(json_array['high24hr']) * (10 ** 8)) low_price = int(float(json_array['low24hr']) * (10 ** 8)) ask_price = int(float(json_array['lowestAsk']) * (10 ** 8)) bid_price = int(float(json_array['highestBid']) * (10 ** 8)) volume = int(float(json_array['baseVolume'])) btc_volume = int(float(json_array['quoteVolume']) * (10 ** 8)) mysql_set_price(1, last_price, high_price, low_price, ask_price, bid_price, volume, btc_volume)
def monitoring_block_count(): # set bot bot = Bot(api_key) count = int(rpc({"action": "block_count"}, 'count')) reference_count = int(reference_block_count()) http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where()) response = http.request('GET', summary_url, headers=header, timeout=20.0) try: json_data = json.loads(response.data) community_count = int(json_data['blocks']) except ValueError as e: community_count = reference_count difference = int(math.fabs(community_count - count)) response = http.request('GET', block_count_url, headers=header, timeout=20.0) raiwallet_count = int(response.data) if (difference > block_count_difference_threshold): # Warning to admins for user_id in admin_list: push(bot, user_id, 'Block count: {0}\nCommunity: {1}\nDifference: *{2}*\nReference: {3}\nraiwallet.info: {4}'.format(count, community_count, difference, reference_count, raiwallet_count)) # trying to fix bootstrap_multi()
def __connect(self): num_pools = float(self.pool_size_total) / self.pool_size_per_route headers = { 'Content-Type': 'application/x-protobuf', 'Accept': 'application/x-protobuf', 'User-Agent': 'python-pilosa/' + VERSION, } timeout = urllib3.Timeout(connect=self.connect_timeout, read=self.socket_timeout) client_options = { "num_pools": num_pools, "maxsize": self.pool_size_per_route, "block": True, "headers": headers, "timeout": timeout, "retries": self.retry_count, } if not self.tls_skip_verify: client_options["cert_reqs"] = "CERT_REQUIRED" client_options["ca_certs"] = self.tls_ca_certificate_path client = urllib3.PoolManager(**client_options) self.__client = client
def __init__(self, lat, lon): """ Populates and parses location data in Location object """ url = REQUEST_URL + "?lat=" + lat + "&lng=" + lon + "&date=" + DATE http = urllib3.PoolManager() r = http.request(REQUEST_TYPE, url) self.__RAW_SUN_DATA__ = sun_json_parse(r.data) self.__SUN_DATA__ = self.__RAW_SUN_DATA__.split(DATA_SPLIT) self.SUNRISE = self.__SUN_DATA__[0] self.SUNSET = self.__SUN_DATA__[1] self.SOLAR_NOON = self.__SUN_DATA__[2] self.DAY_LENGTH = self.__SUN_DATA__[3] self.CIVIL_TWILIGHT_BEGIN = self.__SUN_DATA__[4] self.CIVIL_TWILIGHT_END = self.__SUN_DATA__[5] self.NAUTICAL_TWILIGHT_BEGIN = self.__SUN_DATA__[6] self.NAUTICAL_TWILIGHT_END = self.__SUN_DATA__[7] self.ASTROLOGICAL_TWILIGHT_BEGIN = self.__SUN_DATA__[8] self.ASTROLOGICAL_TWILIGHT_END = self.__SUN_DATA__[9]
def urllib3_test(): retry = urllib3.util.Retry(total=MAX_RETRIES, connect=MAX_RETRIES, read=MAX_RETRIES, backoff_factor=BACKOFF_FACTOR) http = urllib3.PoolManager(retries=retry, timeout=TIMEOUT) def attempt(url, http=http, retry=retry): r = None try: r = http.request('GET', url, retries=retry) except Exception as e: retry = r.retries if r else retry retry = retry.increment('GET', url, error=e) retry.sleep() logging.warning("Retrying (%r) after connection broken by '%r': '%s'", retry, e, url) return attempt(url, retry=retry) return r return json.loads(attempt(URL, http).data.decode('utf-8'))