我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.packages.urllib3.exceptions.InsecureRequestWarning()。
def checkenv(self): try: import requests.packages.urllib3 except ImportError: raise Exception('requests package not installed. Run pip install -r requirements.txt and try again.') # Disable warnings about SSL connections try: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) except ImportError: pass try: from requests.packages.urllib3.exceptions import InsecurePlatformWarning requests.packages.urllib3.disable_warnings(InsecurePlatformWarning) except ImportError: pass
def __init__(self, base_url, project, user, password, verify=False, timeout=None, auth_type=None): if not base_url.endswith('/'): base_url += '/' collection, project = self.get_collection_and_project(project) # Remove part after / in project-name, like DefaultCollection/MyProject => DefaultCollection # API responce only in Project, without subproject self._url = base_url + '%s/_apis/' % collection if project: self._url_prj = base_url + '%s/%s/_apis/' % (collection, project) else: self._url_prj = self._url self.http_session = requests.Session() auth = auth_type(user, password) self.http_session.auth = auth self.timeout = timeout self._verify = verify if not self._verify: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def get_room_id(room_name, token): requests.packages.urllib3.disable_warnings(InsecureRequestWarning) id = "" headers = {'Authorization':'Bearer '+token, 'Content-Type':'application/json'} resp = requests.get('https://api.ciscospark.com/v1/rooms', verify=False,headers=headers,proxies=proxies) if resp.status_code == 200: rooms = json.loads(resp.text)['items'] for room in rooms: if room['title'] == room_name: id = room['id'] return id
def post_message(message_text, room_id, token, markdown=False): requests.packages.urllib3.disable_warnings(InsecureRequestWarning) headers = {'Authorization':'Bearer '+token, 'Content-Type':'application/json'} if markdown: body = json.dumps({'roomId':room_id,'markdown':message_text}) else: body = json.dumps({'roomId':room_id,'text':message_text}) resp = requests.post('https://api.ciscospark.com/v1/messages', verify=False,headers=headers,data=body,proxies=proxies) return resp
def test_connection(self): """tests connection to OpenNMS REST API and returns HTTP status code returns -1, if there was an error connecting to the server """ # return code output = -1 # config config_rest_url = self.__source.source_url config_rest_user = self.__source.source_user config_rest_pw = self.__source.source_password # test connection to OpenNMS REST API requests.packages.urllib3.disable_warnings(InsecureRequestWarning) request_url = config_rest_url + "/info" try: response = requests.get(request_url, auth=(config_rest_user, config_rest_pw), verify=False) except: return output return response.status_code
def __init__(self, host, login, password, bios_password=None, cacert=None): self.host = host self.login = login self.password = password self.bios_password = bios_password # Message registry support self.message_registries = {} self.cacert = cacert # By default, requests logs following message if verify=False # InsecureRequestWarning: Unverified HTTPS request is # being made. Adding certificate verification is strongly advised. # Just disable the warning if user intentionally did this. if self.cacert is None: urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning)
def __init__(self, host, login, password, timeout=60, port=443, cacert=None): """Constructor for RIBCLOperations. """ self.host = host self.login = login self.password = password self.timeout = timeout self.port = port self.cacert = cacert # By default, requests logs following message if verify=False # InsecureRequestWarning: Unverified HTTPS request is # being made. Adding certificate verification is strongly advised. # Just disable the warning if user intentionally did this. if self.cacert is None: urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning)
def setUpClass(cls): """Do an initial DB clean up in case something went wrong the last time. If a test failed really badly, the DB might be in a bad state despite attempts to clean up during tearDown(). """ cls.rmt = BossRemote('test.cfg', API_VER) # Turn off SSL cert verification. This is necessary for interacting with # developer instances of the Boss. cls.rmt.project_service.session_send_opts = {'verify': False} cls.rmt.metadata_service.session_send_opts = {'verify': False} cls.rmt.volume_service.session_send_opts = {'verify': False} requests.packages.urllib3.disable_warnings(InsecureRequestWarning) cls.admin = 'admin' cls.user_mgr = 'user-manager' cls.rsrc_mgr = 'resource-manager' cls.user = 'role_test_user{}'.format(random.randint(0, 9999)) cls.rmt.add_user(cls.user, 'John', 'Doe', 'jd{}@me.com'.format(random.randint(0, 9999)), 'password') cls.cleanup_db()
def setUpClass(cls): """Do an initial DB clean up in case something went wrong the last time. If a test failed really badly, the DB might be in a bad state despite attempts to clean up during tearDown(). """ warnings.filterwarnings('ignore') cls.rmt = BossRemote('test.cfg', API_VER) # Turn off SSL cert verification. This is necessary for interacting with # developer instances of the Boss. cls.rmt.project_service.session_send_opts = {'verify': False} cls.rmt.metadata_service.session_send_opts = {'verify': False} cls.rmt.volume_service.session_send_opts = {'verify': False} requests.packages.urllib3.disable_warnings(InsecureRequestWarning) cls.user = 'user_test_user{}'.format(random.randint(0, 9999)) cls.first_name = 'john' cls.last_name = 'doe' cls.email = 'jd{}@me.com'.format(random.randint(0, 9999)) cls.password = 'password'
def getHTMLCookies(url): """ ???????? """ requests.packages.urllib3.disable_warnings(InsecureRequestWarning) proxies = { "http": "http://36.111.205.166:80", "https": "http://36.111.205.166:80" } headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36', } try: r = requests.get(url, proxies=proxies, headers=headers, verify=False, timeout=20) r.raise_for_status() r.encoding = 'utf-8' return r.cookies except Exception: logger.exception("Download HTML Text failed")
def getHTMLText(url, cookies): """ ???????? """ requests.packages.urllib3.disable_warnings(InsecureRequestWarning) proxies = { "http": "http://36.111.205.166:80", "https": "http://36.111.205.166:80" } headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36', } try: r = requests.get(url, proxies=proxies, headers=headers, cookies=cookies, verify=False, timeout=20) r.raise_for_status() r.encoding = 'utf-8' return r.text except Exception: logger.exception("Download HTML Text failed")
def getStations(): """ ??????????? """ requests.packages.urllib3.disable_warnings(InsecureRequestWarning) st_dict = {} stations_url = 'https://kyfw.12306.cn/otn/resources/js/framework/station_name.js?station_version=1.8971' html = requests.get(stations_url, verify=False) html.raise_for_status() stations_list = re.findall(r'[\u4e00-\u9fa5]+\|[A-Z]+', html.text) for stations in stations_list: area = str(stations).split('|') stations_name = area[0] stations_code = area[1] st_dict[stations_name] = stations_code return st_dict
def runnessus(self, url, api_akey, api_skey, policy, targets, scan_name, insecure): if insecure: requests.packages.urllib3.disable_warnings(InsecureRequestWarning) scan = ness6rest.Scanner(url=url, api_akey=api_akey, api_skey=api_skey, insecure=insecure) scan.policy_set(name=policy) logging.debug("TARGETS: {t}".format(t=targets)) scan.scan_add(targets=targets, name=scan_name) scan.action(action="scans/" + str(scan.scan_id) + "/launch", method="POST") scan.scan_uuid = scan.res["scan_uuid"] print("{i} SCAN NAME: {n}".format(i=ctinfo, n=scan.scan_name)) print("{i} SCAN UUID: {n}".format(i=ctinfo, n=scan.scan_uuid)) self.scanstatus(scan.tag_id, scan.scan_uuid, url, api_akey, api_skey, insecure) with suppress_stdout(): output = str(scan.download_scan(export_format="nessus")) return output
def cli(): """command-line interface""" arguments = docopt(__doc__) from_station = stations.get(arguments['<from>']) to_station = stations.get(arguments['<to>']) date = arguments['<date>'] # URL url = 'https://kyfw.12306.cn/otn/lcxxcx/query?purpose_codes=ADULT&queryDate={}&from_station={}&to_station={}'.format( date, from_station, to_station ) requests.packages.urllib3.disable_warnings(InsecureRequestWarning) r = requests.get(url,verify = False) print(r) rows = r.json()['data']['datas'] trains = TrainCollection(rows) trains.pretty_print()
def __init__(self, apiKey, server, username=None, password=None): if not ((apiKey or (username and password)) and server): raise ValueError("You must provide server argument and key or user & password") if not server.find('https://') == 0 and not server.find('http://') == 0: raise ValueError("Server must be a url (e.g. 'https://<server>' or 'http://<server>')") if not server[-1] == '/': server += '/' self.server = server self.session = Session() if not apiKey: self.xsrf = True try: r = self.session.get(server, verify=False) except InsecureRequestWarning: pass self.token = r.cookies[DemistoClient.XSRF_COOKIE_KEY] self.username = username self.password = password else: self.xsrf = False self.apiKey = apiKey
def req(self, method, path, data): h = {"Accept": "application/json", "Content-type": "application/json"} if self.xsrf: h[DemistoClient.XSRF_TOKEN_KEY] = self.token else: h[DemistoClient.AUTHORIZATION] = self.apiKey try: if self.session: r = self.session.request(method, self.server+path, headers=h, verify=False, json=data) else: raise RuntimeError("Session not initialized!") except InsecureRequestWarning: pass return r
def __init__(self, http_config): assert isinstance(http_config, HTTPConfig), "not an instance of HTTPConfig" self.http_config = http_config super(CustomHTTPAdapter, self).__init__(max_retries=http_config.max_retries) self._logger = logging.getLogger(__name__) if not http_config.verify_ssl_cert: requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # pylint: disable=R0913
def __init__(self): self.__host = None self.__user = None self.__pwd = None self.__SSL_VERIFY = False self.__POST_HEADERS = None self.__APIVER = None # TODO: If log level is DEBUG avoid this code logging.getLogger('requests').setLevel(logging.CRITICAL) requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def create_unverified_session(session, suppress_warning=True): """ Create a unverified session to disable the server certificate verification. This is not recommended in production code. """ session.verify = False if suppress_warning: # Suppress unverified https request warnings from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) return session
def __init__(self, hostname, username, password, verify=True, prefix=""): """ Constructor, creating the class. It requires specifying a hostname, username and password to access the API. After initialization, a connected is established. :param hostname: Foreman host :type hostname: str :param username: API username :type username: str :param password: corresponding password :type password: str :param verify: force SSL verification :type verify: bool :param prefix: API prefix (e.g. /katello) :type prefix: str """ #disable SSL warning outputs requests.packages.urllib3.disable_warnings(InsecureRequestWarning) #set connection information self.HOSTNAME = self.validate_hostname(hostname) self.USERNAME = username self.PASSWORD = password self.VERIFY = verify self.URL = "https://{0}{1}/api/v2".format(self.HOSTNAME, prefix) #start session and check API version if Foreman API self.__connect() if prefix == "": self.validate_api_support()
def init(self, out_stream, url, proxy=None, g_stop_event=None, maxbitrate=0): global clientHeader, gproxy, session, use_proxy try: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) session = requests.Session() session.cookies = cookieJar self.init_done = False self.init_url = url clientHeader = None self.proxy = proxy use_proxy = False if self.proxy and len(self.proxy) == 0: self.proxy = None gproxy = self.proxy self.out_stream = out_stream if g_stop_event: g_stop_event.clear() self.g_stopEvent = g_stop_event self.maxbitrate = maxbitrate if '|' in url: sp = url.split('|') url = sp[0] clientHeader = sp[1] log(clientHeader) clientHeader = urlparse.parse_qsl(clientHeader) log('header received now url and headers are %s | %s' % (url, clientHeader)) self.url = url return True except: traceback.print_exc() return False
def __init__(self, base_url, username, password, verify, organization): """ :param base_url: Url to connect to :type base_url: str :param username: Username for querying :type username: str :param password: Password for querying :type password: str :param verify: Whether to accept self-signed ssl :type verify: bool :param organization: Organization to use :type organization: str :returns: KatelloConnection object :rtype: KatelloConnection """ self.organization = organization self.base_url = base_url self.session = requests.Session() self.session.auth = (username, password) self.session.verify = verify self.post_headers = {'Content-Type': 'application/json'} if not verify: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # Check if we can authenticate if 'results' not in self.session.get('%s/katello/api/v2/organizations' % self.base_url).json(): raise AuthenticationError('Authentication failed')
def validate_cert(self, validate_cert: bool): if not isinstance(validate_cert, bool): raise ValueError(f"{type(validate_cert)} is not a valid validate_cert argument type") self._caller.validate_cert = bool(validate_cert) if not validate_cert: # Warnings in urllib3 can only be disabled, not reenabled urllib3.disable_warnings(InsecureRequestWarning)
def __init__(self): self._backend = PY_ACTIVE_SYNC self._creds = { 'server': None, 'user': None, 'password': None, 'domain': None, # This could be optional. 'device_id': None, # This could be optional. } requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def create_room(room_name, token): requests.packages.urllib3.disable_warnings(InsecureRequestWarning) headers = {'Authorization':'Bearer '+token, 'Content-Type':'application/json'} body = json.dumps({'title':room_name}) resp = requests.post('https://api.ciscospark.com/v1/rooms', verify=False,headers=headers,data=body) print resp
def list_rooms(token): requests.packages.urllib3.disable_warnings(InsecureRequestWarning) headers = {'Authorization':'Bearer '+token, 'Content-Type':'application/json'} resp = requests.get('https://api.ciscospark.com/v1/rooms', verify=False,headers=headers) return resp
def list_messages(room_id, token): requests.packages.urllib3.disable_warnings(InsecureRequestWarning) headers = {'Authorization':'Bearer '+token, 'Content-Type':'application/json'} resp = requests.get('https://api.ciscospark.com/v1/messages?roomId={}'.format(room_id), verify=False,headers=headers) return resp.text
def cleanup_room(room_id, token): requests.packages.urllib3.disable_warnings(InsecureRequestWarning) messages = json.loads(list_messages(room_id, token)) headers = {'Authorization':'Bearer '+token, 'Content-Type':'application/json'} for message in messages['items']: resp = requests.delete('https://api.ciscospark.com/v1/messages/{}'.format(message['id']), verify=False,headers=headers) print resp.status_code
def send_message(self, message): """send message Args: - message: message to send """ # set parameters target = self.get_parameter("target") url = self.get_parameter("url") url_parameters = { "login": self.get_parameter("user"), "pass": self.get_parameter("password"), "to": target, "message": message } # send HTTP GET request to SMS Eagle try: requests.packages.urllib3.disable_warnings(InsecureRequestWarning) response = requests.get(url, params=url_parameters) # check response if response.status_code != 200: self._logger.error("Could not send SMS to %s: %s", target, message) else: self._logger.info("Send SMS to %s: %s", target, message) except: self._logger.error("Could not connect to SMS Eagle")
def test_init_without_cacert(self, disable_warning_mock): rest_client = v1.RestConnectorBase( "x.x.x.x", "admin", "Admin", bios_password='foo') self.assertEqual(rest_client.host, "x.x.x.x") self.assertEqual(rest_client.login, "admin") self.assertEqual(rest_client.password, "Admin") self.assertIsNone(rest_client.cacert) disable_warning_mock.assert_called_once_with( urllib3_exceptions.InsecureRequestWarning)
def test_init_without_cacert(self, disable_warning_mock): ris_client = ris.RISOperations( "x.x.x.x", "admin", "Admin", bios_password='foo') self.assertEqual(ris_client.host, "x.x.x.x") self.assertEqual(ris_client.login, "admin") self.assertEqual(ris_client.password, "Admin") self.assertIsNone(ris_client.cacert) disable_warning_mock.assert_called_once_with( urllib3_exceptions.InsecureRequestWarning)
def test_init_without_cacert(self, disable_warning_mock): ribcl_client = ribcl.RIBCLOperations( "x.x.x.x", "admin", "Admin", 60, 443) self.assertEqual(ribcl_client.host, "x.x.x.x") self.assertEqual(ribcl_client.login, "admin") self.assertEqual(ribcl_client.password, "Admin") self.assertEqual(ribcl_client.timeout, 60) self.assertEqual(ribcl_client.port, 443) self.assertIsNone(ribcl_client.cacert) disable_warning_mock.assert_called_once_with( urllib3_exceptions.InsecureRequestWarning)
def _suppress_ssl_warning(self): # Disable https warning because of non-standard certs on appliance try: self.logger.debug("Suppressing SSL Warnings.") requests.packages.urllib3.disable_warnings(InsecureRequestWarning) except AttributeError: self.logger.warning("load requests.packages.urllib3.disable_warnings() failed")
def __init__(self, filename = None, activity = None, name = None, description = None, private = False, trainer = False, commute = False, format = 'gpx', handle = None, apiKey = None): self.apiKey = None self.reset() # turn off the warning because we're not checking the cert requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def attach_disk(bkpid,diskid,snapid): xmlattach = "<disk id=\""+diskid+"\"><snapshot id=\""+snapid+"\"/> <active>true</active></disk>" urlattach = url+"/v3/vms/"+bkpid+"/disks/" headers = {'Content-Type': 'application/xml', 'Accept': 'application/xml'} requests.packages.urllib3.disable_warnings(InsecureRequestWarning) resp_attach = requests.post(urlattach, data=xmlattach, headers=headers, verify=False, auth=(user,password)) # Funcion para desactivar disco virtual # Function to deactivate virtual disk
def detach_disk(bkpid,diskid): urldelete = url+"/vms/"+bkpid+"/diskattachments/"+diskid requests.packages.urllib3.disable_warnings(InsecureRequestWarning) requests.delete(urldelete, verify=False, auth=(user,password)) # Funcion para obtener el nombre de dispositivo de disco virtual # Function to get device name of a virtual disk
def _import_file(self, filename): if not self.api_key: key = self.device.keygen() else: key = self.api_key params = { 'type': 'import', 'category': 'configuration', 'key': key } path = os.path.basename(filename) mef = requests_toolbelt.MultipartEncoder( fields={ 'file': (path, open(filename, 'rb'), 'application/octet-stream') } ) requests.packages.urllib3.disable_warnings(InsecureRequestWarning) url = 'https://{0}/api/'.format(self.hostname) request = requests.post( url, verify=False, params=params, headers={'Content-Type': mef.content_type}, data=mef ) # if something goes wrong just raise an exception request.raise_for_status() response = xml.etree.ElementTree.fromstring(request.content) if response.attrib['status'] == 'error': return False else: return path
def ignore_warnings(): requests.packages.urllib3.disable_warnings(InsecureRequestWarning) requests.packages.urllib3.disable_warnings(SubjectAltNameWarning) requests.packages.urllib3.disable_warnings(InsecurePlatformWarning) requests.packages.urllib3.disable_warnings(SNIMissingWarning)
def issue_request(self, method, params, endpoint=None): if params is None: params = {} # NOTE(jdg): We allow passing in a new endpoint to issue_api_req # to enable some of the multi-cluster features like replication etc if endpoint is None: endpoint_dict = self.endpoint_dict payload = {'method': method, 'params': params} url = '%s/json-rpc/%s/' % (endpoint_dict['url'], self.endpoint_version) LOG.debug('Issue SolidFire API call: %s' % json.dumps(payload)) start_time = time.time() with warnings.catch_warnings(): warnings.simplefilter("ignore", exceptions.InsecureRequestWarning) req = requests.post(url, data=json.dumps(payload), auth=(endpoint_dict['login'], endpoint_dict['password']), verify=False, timeout=30) # FIXME(jdg): Failure cases like wrong password # missing something that cause req.json to puke response = req.json() req.close() end_time = time.time() duration = end_time - start_time LOG.debug('Raw response data from SolidFire API: %s' % response) # TODO(jdg): Add check/retry catch for things where it's appropriate if 'error' in response: msg = ('API response: %s'), response self.request_history.append( (method, start_time, duration, 'failed')) LOG.error('Error in API request: %s' % response['error']) raise sfexceptions.SolidFireRequestException(msg) self.request_history.append( (method, start_time, duration, 'ok')) return response['result']
def send_request(self, method, params, version='1.0', endpoint=None): if params is None: params = {} # NOTE(jdg): We allow passing in a new endpoint to issue_api_req # to enable some of the multi-cluster features like replication etc if endpoint is None: endpoint_dict = self.endpoint_dict payload = {'method': method, 'params': params} url = '%s/json-rpc/%s/' % (endpoint_dict['url'], version) LOG.debug('Issue SolidFire API call: %s' % json.dumps(payload)) with warnings.catch_warnings(): warnings.simplefilter("ignore", exceptions.InsecureRequestWarning) req = requests.post(url, data=json.dumps(payload), auth=(endpoint_dict['login'], endpoint_dict['password']), verify=False, timeout=30) response = req.json() req.close() # TODO(jdg): Fix the above, failure cases like wrong password # missing something that cause req.json to puke LOG.debug('Raw response data from SolidFire API: %s' % response) # TODO(jdg): Add check/retry catch for things where it's appropriate if 'error' in response: msg = ('API response: %s'), response raise SolidFireRequestException(msg) return response['result']
def live_dangerously(self): """ Disable SSL checking. Note that this should *only* be used while developing against a local Socrata instance. """ import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) self.verify = False
def __init__(self, **kwargs): super(Microsoft, self).__init__(**kwargs) import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) self.directline_host = kwargs.get('directline_host', 'https://directline.botframework.com') # NOTE: Direct Line client credentials are different from your bot's # credentials self.direct_line_token_or_secret = kwargs.\ get('direct_line_token_or_secret') authorization_header = 'BotConnector {}'.\ format(self.direct_line_token_or_secret) self.headers = { 'Authorization': authorization_header, 'Content-Type': 'application/json', 'Accept': 'application/json', 'charset': 'utf-8' } conversation_data = self.start_conversation() self.conversation_id = conversation_data.get('conversationId') self.conversation_token = conversation_data.get('token')
def setUpClass(cls): """Do an initial DB clean up in case something went wrong the last time. If a test failed really badly, the DB might be in a bad state despite attempts to clean up during tearDown(). """ warnings.filterwarnings('ignore') cls.rmt = BossRemote('test.cfg', API_VER) # Turn off SSL cert verification. This is necessary for interacting with # developer instances of the Boss. cls.rmt.project_service.session_send_opts = {'verify': False} cls.rmt.metadata_service.session_send_opts = {'verify': False} cls.rmt.volume_service.session_send_opts = {'verify': False} requests.packages.urllib3.disable_warnings(InsecureRequestWarning) cls.create_grp_name = 'int_test_group{}'.format(random.randint(0, 9999)) cls.existing_grp_name = 'int_test_group_exists{}'.format(random.randint(0, 9999)) cls.rmt.create_group(cls.existing_grp_name) cls.user_name = 'bossadmin' # Create a new user because the user tests run under, will # automatically be a member and maintainer of any groups created # during testing. cls.created_user = 'group_test_johndoeski{}'.format(random.randint(0, 9999)) password = 'myPassW0rd' cls.rmt.add_user(cls.created_user, 'John', 'Doeski', 'jdoe{}@rime.com'.format(random.randint(0, 9999)), password) token = cls.get_access_token(cls.created_user, password) cls.login_user(token)
def setUpClass(cls): """Do an initial DB clean up in case something went wrong the last time. If a test failed really badly, the DB might be in a bad state despite attempts to clean up during tearDown(). """ cls.rmt = BossRemote('test.cfg', API_VER) # Turn off SSL cert verification. This is necessary for interacting with # developer instances of the Boss. cls.rmt.project_service.session_send_opts = {'verify': False} cls.rmt.metadata_service.session_send_opts = {'verify': False} cls.rmt.volume_service.session_send_opts = {'verify': False} requests.packages.urllib3.disable_warnings(InsecureRequestWarning) coll_name = 'collection_perm_test-{}'.format(random.randint(0, 9999)) cls.coll = CollectionResource(coll_name, 'bar') cf_name = 'PermissionTestFrame{}'.format(random.randint(0, 9999)) cls.coord = CoordinateFrameResource( cf_name, 'Test coordinate frame.', 0, 10, -5, 5, 3, 6, 1, 1, 1, 'nanometers', 0, 'nanoseconds') cls.exp = ExperimentResource( 'perm_test_exp', cls.coll.name, cls.coord.name, 'my experiment', 1, 'isotropic', 1) cls.chan = ChannelResource( 'perm_test_ch', cls.coll.name, cls.exp.name, 'image', 'test channel', 0, 'uint8', 0) cls.grp_name = 'int_perm_test_group{}'.format(random.randint(0, 9999)) cls.rmt.create_project(cls.coll) cls.rmt.create_project(cls.coord) cls.rmt.create_project(cls.exp) cls.rmt.create_project(cls.chan) cls.rmt.create_group(cls.grp_name)
def __init__(self, service_information, proxies=None): self.service_information = service_information self.proxies = proxies if proxies is not None else dict(http='', https='') self.authorization_code_context = None self.refresh_token = None self._session = None if service_information.skip_ssl_verifications: from requests.packages.urllib3.exceptions import InsecureRequestWarning import warnings warnings.filterwarnings('ignore', 'Unverified HTTPS request is being made.*', InsecureRequestWarning)
def main(): # We silence the insecure requests warnings we get for using # self-signed certificates. disable_warnings(InsecureRequestWarning) cli(obj=RemoteAppRestContext())
def __init__(self, log_level, hostname, username, password, verify=True, prefix=""): """ Constructor, creating the class. It requires specifying a hostname, username and password to access the API. After initialization, a connected is established. :param log_level: log level :type log_level: logging :param hostname: Foreman host :type hostname: str :param username: API username :type username: str :param password: corresponding password :type password: str :param verify: force SSL verification :type verify: bool :param prefix: API prefix (e.g. /katello) :type prefix: str """ #set logging self.LOGGER.setLevel(log_level) #disable SSL warning outputs requests.packages.urllib3.disable_warnings(InsecureRequestWarning) #set connection information self.HOSTNAME = self.validate_hostname(hostname) self.USERNAME = username self.PASSWORD = password self.VERIFY = verify self.URL = "https://{0}{1}/api/v2".format(self.HOSTNAME, prefix) #start session and check API version if Foreman API self.__connect() if prefix == "": self.validate_api_support()
def __init__ (self): # Disable insecure-certificate-warning message requests.packages.urllib3.disable_warnings(InsecureRequestWarning) return
def perform_query(self, method, path, data = {}, headers = None): """set up connection and perform query""" if headers is None: headers = self.default_headers with warnings.catch_warnings(): warnings.simplefilter("ignore", exceptions.InsecureRequestWarning) resp = self.session.request(method, self.url + path, data = json.dumps(data), verify = self.verify, headers = headers) try: resp.raise_for_status() except requests.exceptions.HTTPError, e: raise e return resp.json()