我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用urllib3.disable_warnings()。
def __init__(self, host, username=None, password=None, verify_ssl=True, user_agent=None, cert=None): self.host = host self.username = username self.password = password self.cert = cert self.verify_ssl = verify_ssl if not user_agent: self.user_agent = 'webinspectapi/' + version else: self.user_agent = user_agent if not self.verify_ssl: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Set auth_type based on what's been provided if username is not None: self.auth_type = 'basic' elif cert is not None: self.auth_type = 'certificate' else: self.auth_type = 'unauthenticated'
def get_e2e_configuration(): config = Configuration() config.host = None if os.path.exists( os.path.expanduser(kube_config.KUBE_CONFIG_DEFAULT_LOCATION)): kube_config.load_kube_config(client_configuration=config) else: print('Unable to load config from %s' % kube_config.KUBE_CONFIG_DEFAULT_LOCATION) for url in ['https://%s:8443' % DEFAULT_E2E_HOST, 'http://%s:8080' % DEFAULT_E2E_HOST]: try: urllib3.PoolManager().request('GET', url) config.host = url config.verify_ssl = False urllib3.disable_warnings() break except urllib3.exceptions.HTTPError: pass if config.host is None: raise unittest.SkipTest('Unable to find a running Kubernetes instance') print('Running test against : %s' % config.host) config.assert_hostname = False return config
def do_get(self, url, top_level=False, top_level_path=""): parts = list(urlparse.urlparse(url)) # 2 is the path offset if top_level: parts[2] = '/' + top_level_path parts[2] = MULTIPLE_SLASH.sub('/', parts[2]) url = urlparse.urlunparse(parts) try: if self.disable_ssl_validation: urllib3.disable_warnings() http = urllib3.PoolManager(cert_reqs='CERT_NONE') else: http = urllib3.PoolManager() r = http.request('GET', url, headers=self.headers) except Exception as e: LOG.error("Request on service '%s' with url '%s' failed", (self.name, url)) raise e if r.status >= 400: raise ServiceError("Request on service '%s' with url '%s' failed" " with code %d" % (self.name, url, r.status)) return r.data
def client(self): cls = self.__class__ if cls._client is None: if self.verify_ssl is False: import requests requests.packages.urllib3.disable_warnings() import urllib3 urllib3.disable_warnings() if self.username and self.password: self.log.debug("Creating Kubernetes client from username and password") cls._client = KubernetesClient.from_username_password(self.host, self.username, self.password, verify_ssl=self.verify_ssl) else: self.log.debug("Creating Kubernetes client from Service Account") cls._client = KubernetesClient.from_service_account(self.host, verify_ssl=self.verify_ssl) return cls._client
def orion_init(): """ Prompts for Orion credentials and returns a SwisClient object """ global orion_server global orion_username global orion_password if not orion_username: default_username = getuser() orion_username = raw_input('Orion username [' + default_username + ']: ') or default_username if not orion_password: orion_password = getpass('Orion password: ') # SolarWinds-Orion is a special hostname in /etc/hosts # This was necessary to implement SSL checking # https://github.com/solarwinds/orionsdk-python#ssl-certificate-verification # this disables the SubjectAltNameWarning urllib3.disable_warnings() # TODO: Need a better/more resilient way of referencing server cert. return orionsdk.SwisClient('SolarWinds-Orion', orion_username, orion_password, verify='server.pem')
def urllib3_disable_warnings(): if 'requests' not in sys.modules: import requests else: requests = sys.modules['requests'] # On latest Fedora, this is a symlink if hasattr(requests, 'packages'): requests.packages.urllib3.disable_warnings() # pylint: disable=maybe-no-member else: # But with python-requests-2.4.3-1.el7.noarch, we need # to talk to urllib3 directly have_urllib3 = False try: if 'urllib3' not in sys.modules: import urllib3 have_urllib3 = True except ImportError: pass if have_urllib3: # Except only call disable-warnings if it exists if hasattr(urllib3, 'disable_warnings'): urllib3.disable_warnings()
def send_push(file): try: urllib3.disable_warnings() pb = Pushbullet(API_KEY) #push = pb.push_note(pb.devices[3]['iden'],'Alarm', 'Motion detected') push = pb.push_note('Alarm', 'Motion detected') print "push-uploading file.." with open(file, 'rb') as vid: file_data = pb.upload_file(vid, 'video.mkv') push = pb.push_file(**file_data) # only for debug: #pushes = pb.get_pushes() #latest = pushes[0] #print latest except Exception, error: print "Error in send_push: " + str(error)
def send(self,u,m="GET",p=None,h=None,c=None): if p is None : p = {} if h is None : h = {} if c is not None : c = {c:''} if '-r' in sys.argv or '--ragent' in sys.argv: h['user-agent'] = self.ragent() else: h['user-agent'] = self.agent # request request = requests.Session() req = urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # get if m.lower()=='get': if p: u='{}'.format(Request.ucheck.payload(u,p)) req = request.request( method=m.upper(),url=u,headers=h,cookies=c,timeout=self.time, allow_redirects=self.redir,proxies={'http':self.proxy,'https':self.proxy},verify=False) # post elif m.lower()=='post': req = request.request( method=m.upper(),url=u,headers=h,cookies=c,timeout=self.time, allow_redirects=self.redir,proxies={'http':self.proxy,'https':self.proxy},verify=False) # req return req
def remote_image_exist(registry, image, tag): urllib3.disable_warnings() url = IMAGE_TAGS_URL % dict(registry=registry, image=image) response = requests.get(url=url, verify=False) if response.status_code != http_client.OK: return False info = response.json() return tag in info['tags']
def get_remote_image_info(image, registry): urllib3.disable_warnings() image_info = [] url = IMAGE_TAGS_URL % dict(registry=registry, image=image) response = requests.get(url=url, verify=False) info = response.json() if response.ok: image_info += [[registry, image, tag] for tag in info['tags']] else: if info['errors'][0]['code'] == 'NAME_UNKNOWN': pass else: raise Exception(info) return image_info
def get_image_digest(registry, image, tag): urllib3.disable_warnings() url = MANIFEST_URL % dict(registry=registry, image=image, reference=tag) headers = {"Accept": "application/vnd.docker.distribution.manifest.v2+json"} response = requests.get(url=url, headers=headers, verify=False) return response.headers['Docker-Content-Digest']
def __init__(self): """ ???,???????. """ self._logger = logger self._token = None # ???? self._session = requests.session() # ?? http ??? self._session.headers.update({ 'User-Agent': 'com.zhihu.android/Futureve/5.1.1 Mozilla/5.0 (Linux; Android 4.4.4; 2014811 Build/KTU84P) ' 'AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/33.0.0.0 Mobile Safari/537.36 ' 'Google-HTTP-Java-Client/1.22.0 (gzip)', 'x-api-version': '3.0.66', 'x-app-version': '5.1.1', 'x-app-za': 'OS=Android&Release=4.4.4&Model=2014811&VersionName=5.1.1&VersionCode=533&Width=720' '&Height=1280&Installer=360&WebView=33.0.0.0DeviceType=AndroidPhoneBrand=Xiaomi', 'x-app-build': 'release', 'x-network-type': 'WiFi', 'x-suger': 'SU1FST04Njc2MjIwMjQ1Njc4MDU7QU5EUk9JRF9JRD1jOTY1NGVkMzcwMWRjYjU1O01BQz05Yzo5OTphMDpiZjo3YzpjMQ', 'x-udid': 'AGCCEL7IpwtLBdi3V7e7dsEXtuW9g1-pets=', 'Connection': 'keep-alive', }) # ??? ssl self._session.verify = False # ?????? HTTPS ????????? urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def _build_cluster_configs(cluster_list): cluster_configs = [] for cluster in cluster_list: username, password, verify_ssl = _get_cluster_auth_data(cluster) if cluster in g_cluster_configs: cluster_name, isi_sdk, api_client, version = \ g_cluster_configs[cluster] else: if verify_ssl is False: urllib3.disable_warnings() try: isi_sdk, api_client, version = \ isi_sdk_utils.configure( cluster, username, password, verify_ssl) except RuntimeError as exc: print >> sys.stderr, "Failed to configure SDK for " \ "cluster %s. Exception raised: %s" \ % (cluster, str(exc)) sys.exit(1) print "Configured %s as version %d cluster, using SDK %s." \ % (cluster, int(version), isi_sdk.__name__) cluster_name = \ _query_cluster_name(cluster, isi_sdk, api_client) g_cluster_configs[cluster] = \ cluster_name, isi_sdk, api_client, version cluster_config = \ ClusterConfig( cluster, cluster_name, version, isi_sdk, api_client) cluster_configs.append(cluster_config) return cluster_configs
def http_request(path, query='', json_body=None, method='get', config=None): config = read_home_config() if config is None else config try: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) except Exception: pass if query is not None: if isinstance(query, dict): query = urlencode(query) if '?' in path: path += '&' + query else: path += '?' + query url = 'https://' + config['host'] + '/api/' + path auth = None if 'auth_user' in config: auth = HTTPBasicAuth(config['auth_user'], config['auth_pw']) if json_body is not None and method == 'get': method = 'post' response = requests.request( method, url, data=json_body, auth=auth, verify=config['ssl_verify'], headers={'Accept': 'application/json'} ) if response.status_code >= 400: raise_response_exception('Failed request ' + path, response) return parse_json(response.content.decode('utf-8'))
def __init__(self, host, api_key, verify_ssl=True, timeout=30, user_agent=None, cert=None, debug=False): """ Initialize a ThreadFix API instance. :param host: The URL for the ThreadFix server. (e.g., http://localhost:8080/threadfix/) :param api_key: The API key generated on the ThreadFix API Key page. :param verify_ssl: Specify if API requests will verify the host's SSL certificate, defaults to true. :param timeout: HTTP timeout in seconds, default is 30. :param user_agent: HTTP user agent string, default is "threadfix_api/[version]". :param cert: You can also specify a local cert to use as client side certificate, as a single file (containing the private key and the certificate) or as a tuple of both file’s path :param debug: Prints requests and responses, useful for debugging. """ self.host = host self.api_key = api_key self.verify_ssl = verify_ssl self.timeout = timeout if not user_agent: self.user_agent = 'threadfix_api/' + version else: self.user_agent = user_agent self.cert = cert self.debug = debug # Prints request and response information. if not self.verify_ssl: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Disabling SSL warning messages if verification is disabled. # Team
def __init__(self, host, token, verify_ssl=True): if 'github.com' in host: self.host = 'https://api.github.com' else: self.host = host + '/api/v3' self.token = token self.verify_ssl = verify_ssl if not self.verify_ssl: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def disableInsecureRequestWarning(): try: try: urllib3 = requests.packages.urllib3 except AttributeError: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) except Exception as e: ERROR('???? InsecureRequestWarning ????%s', e)
def getFAinfo(flash_array, apitoken): # Retrieves the basic statistics of the given flash_array urllib3.disable_warnings() fa = purestorage.FlashArray(flash_array, api_token=apitoken) fainfo = fa.get(action='monitor') fa.invalidate_cookie() return(fainfo)
def main1(): urllib3.disable_warnings() #?????????warning global http http = urllib3.PoolManager() #????? global headers headers = { #???????? 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36', #'Accept-Language':'zh-CN', 'Content-type':'text/html;charset=utf-8' } #?????????????????https?????????? global url url='https://www.qiushibaike.com' print ('1') try_1() print ('2') #content=downloadwebpage(url) pattern = re.compile('<span>(.*?)</span>',re.S) b=re.findall(pattern,content) #?????????????????????????????? print ('??IP?'+b[0],'\n?????'+b[1]) #????????? return b[0]
def main(): print ('start') urllib3.disable_warnings() #?????????warning global http http = urllib3.PoolManager() #????? global headers headers = { #???????? 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36', #'Accept-Language':'zh-CN', 'Content-type':'text/html;charset=utf-8' } #?????????????????https?????????? global url url='https://www.qiushibaike.com' print ('url is ',url) try_1() #content=downloadwebpage(url) pattern = re.compile('<span>(.*?)</span>',re.S) b=re.findall(pattern,content) #?????????????????????????????? print ('????'+b[0]) #????????? return(b[0])
def __init__(self, api_key): self.api_key = api_key urllib3.disable_warnings()
def __init__(self, host_port, num_tries=5, delay_in_sec=5): urllib3.disable_warnings() if host_port.startswith('http://') or host_port.startswith('https://'): self.url = host_port else: self.url = 'https://' + host_port self.num_tries = num_tries self.delay_in_sec = delay_in_sec self.headers = None self.conn = None self.init_connection()
def __init__(self): """ Initialize all needed Variables """ self.log = logging.getLogger('Icinga2API.client') self.connection = Session() self.connection.headers.update({'Accept': 'application/json'}) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def report_usage_and_get_warnings(calico_version, hostname, cluster_guid, cluster_size, cluster_type): """Reports the cluster's guid, size and version to projectcalico.org. Logs out of date calico versions, to the standard log file. Logs warnings returned by the usage server. The warnings might including warning if using a version of Calico that is no longer supported :param calico_version: the calico version :param hostname: the agent's hostname :param cluster_guid: the unique cluster identifier :param cluster_size: the number of felix instances :cluster_type: the type of cluster """ _log.info("report_usage_and_get_warnings calico_version=%s, hostname=%s, guid=%s, size=%s, cluster_type=%s", calico_version, hostname, cluster_guid, cluster_size, cluster_type) try: url = 'https://usage.projectcalico.org/UsageCheck/calicoVersionCheck' urllib3.disable_warnings() http = urllib3.PoolManager() fields = { 'version': calico_version, 'hostname': hostname, 'guid': cluster_guid, 'size': cluster_size, 'cluster_type': cluster_type } # Exponential backoff retry # http://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#module-urllib3.util.retry # Note this retry is not required to prevent thundering herd, because the jitter takes care of that # It is simply an additional retry in case of dropped or lost connections. retries = urllib3.util.retry.Retry(connect=5, read=5, redirect=5, backoff_factor=1.0) # Send the Usage Report to projectcalico.org r = http.request('GET', url, fields=fields, retries=retries) reply = r.data.decode('utf-8') _log.info("usage_report status=%s, reply=%s", r.status, reply) except Exception: _log.exception("Exception in usage_report")
def get_remote_version(): """ Show remote version """ config = FileReader() if hasattr(urllib3, 'disable_warnings'): urllib3.disable_warnings() http = urllib3.PoolManager() response = http.request('GET', config.get_config().get('info', 'setup')) config = config.get_config_raw(response.data) return config.get('info', 'version')
def http_request(self): import requests import urllib3 urllib3.disable_warnings() headers = { 'cache-control': "no-cache", } response = requests.request("GET", self.url, headers=headers) if response.status_code == 200: return response.text.encode('utf-8') else: return ""
def get(self, host, params=()): # type: (object, object) -> object """Get metadata by url""" self.__is_server_online(host) self.__disable_verbose() self.__parse_params(params) scheme, host = urlparse(host).scheme, urlparse(host).netloc self.DEFAULT_HTTP_PROTOCOL = scheme + "://" self.urls = self.__get_urls(host) response = {} self.HEADER['user-agent'] = self.reader.get_random_user_agent() log.info("user-agent : " + self.HEADER['user-agent']) log.info('Thread num : ' + str(self.threads)) try: httplib.HTTPConnection.debuglevel = self.debug if hasattr(urllib3, 'disable_warnings'): urllib3.disable_warnings() if scheme == "http": self.http = urllib3.HTTPConnectionPool(host.split(':')[0], port=80 if len(host.split(':')) == 1 else int( host.split(':')[1]), block=True, maxsize=10) elif scheme == "https": self.http = urllib3.HTTPSConnectionPool(host.split(':')[0], port=443 if len(host.split(':')) == 1 else int( host.split(':')[1]), block=True, maxsize=10) else: log.critical("not support http protocl, Exit now ") sys.exit(1); pool = threadpool.ThreadPool(self.threads) requests = threadpool.makeRequests(self.request, self.urls) for req in requests: pool.putRequest(req) time.sleep(1) pool.wait() except exceptions.AttributeError as e: log.critical(e.message) except KeyboardInterrupt: log.warning('Session canceled') sys.exit() self.counter['total'] = self.urls.__len__() self.counter['pools'] = pool.workers.__len__() response['count'] = self.counter response['result'] = self.result return response