Python requests.packages.urllib3.exceptions 模块,InsecureRequestWarning() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.packages.urllib3.exceptions.InsecureRequestWarning()

项目:jscanner    作者:tampe125    | 项目源码 | 文件源码
def checkenv(self):
        try:
            import requests.packages.urllib3
        except ImportError:
            raise Exception('requests package not installed. Run pip install -r requirements.txt and try again.')

        # Disable warnings about SSL connections
        try:
            from requests.packages.urllib3.exceptions import InsecureRequestWarning
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        except ImportError:
            pass

        try:
            from requests.packages.urllib3.exceptions import InsecurePlatformWarning
            requests.packages.urllib3.disable_warnings(InsecurePlatformWarning)
        except ImportError:
            pass
项目:tfs    作者:devopshq    | 项目源码 | 文件源码
def __init__(self, base_url, project, user, password, verify=False, timeout=None, auth_type=None):
        if not base_url.endswith('/'):
            base_url += '/'

        collection, project = self.get_collection_and_project(project)
        # Remove part after / in project-name, like DefaultCollection/MyProject => DefaultCollection
        # API responce only in Project, without subproject
        self._url = base_url + '%s/_apis/' % collection
        if project:
            self._url_prj = base_url + '%s/%s/_apis/' % (collection, project)
        else:
            self._url_prj = self._url

        self.http_session = requests.Session()
        auth = auth_type(user, password)
        self.http_session.auth = auth

        self.timeout = timeout
        self._verify = verify
        if not self._verify:
            from requests.packages.urllib3.exceptions import InsecureRequestWarning
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
项目:clus2017_iosxe_demo_booth    作者:CiscoDevNet    | 项目源码 | 文件源码
def get_room_id(room_name, token):

    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    id = ""

    headers = {'Authorization':'Bearer '+token,
                'Content-Type':'application/json'}
    resp = requests.get('https://api.ciscospark.com/v1/rooms',
    verify=False,headers=headers,proxies=proxies)

    if resp.status_code == 200:

        rooms = json.loads(resp.text)['items']

        for room in rooms:
            if room['title'] == room_name:
                id = room['id']

    return id
项目:clus2017_iosxe_demo_booth    作者:CiscoDevNet    | 项目源码 | 文件源码
def post_message(message_text, room_id, token, markdown=False):

    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    headers = {'Authorization':'Bearer '+token,
                'Content-Type':'application/json'}

    if markdown:
        body = json.dumps({'roomId':room_id,'markdown':message_text})
    else:
        body = json.dumps({'roomId':room_id,'text':message_text})

    resp = requests.post('https://api.ciscospark.com/v1/messages',
    verify=False,headers=headers,data=body,proxies=proxies)

    return resp
项目:opennms_alarmforwarder    作者:NETHINKS    | 项目源码 | 文件源码
def test_connection(self):
        """tests connection to OpenNMS REST API and returns HTTP status code
        returns -1, if there was an error connecting to the server
        """
        # return code
        output = -1

        # config
        config_rest_url = self.__source.source_url
        config_rest_user = self.__source.source_user
        config_rest_pw = self.__source.source_password

        # test connection to OpenNMS REST API
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        request_url = config_rest_url + "/info"
        try:
            response = requests.get(request_url, auth=(config_rest_user, config_rest_pw),
                                    verify=False)
        except:
            return output
        return response.status_code
项目:deb-python-proliantutils    作者:openstack    | 项目源码 | 文件源码
def __init__(self, host, login, password, bios_password=None,
                 cacert=None):
        self.host = host
        self.login = login
        self.password = password
        self.bios_password = bios_password
        # Message registry support
        self.message_registries = {}
        self.cacert = cacert

        # By default, requests logs following message if verify=False
        #   InsecureRequestWarning: Unverified HTTPS request is
        #   being made. Adding certificate verification is strongly advised.
        # Just disable the warning if user intentionally did this.
        if self.cacert is None:
            urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning)
项目:deb-python-proliantutils    作者:openstack    | 项目源码 | 文件源码
def __init__(self, host, login, password, timeout=60, port=443,
                 cacert=None):
        """Constructor for RIBCLOperations.

        """
        self.host = host
        self.login = login
        self.password = password
        self.timeout = timeout
        self.port = port
        self.cacert = cacert

        # By default, requests logs following message if verify=False
        #   InsecureRequestWarning: Unverified HTTPS request is
        #   being made. Adding certificate verification is strongly advised.
        # Just disable the warning if user intentionally did this.
        if self.cacert is None:
            urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning)
项目:intern    作者:jhuapl-boss    | 项目源码 | 文件源码
def setUpClass(cls):
        """Do an initial DB clean up in case something went wrong the last time.

        If a test failed really badly, the DB might be in a bad state despite
        attempts to clean up during tearDown().
        """
        cls.rmt = BossRemote('test.cfg', API_VER)

        # Turn off SSL cert verification.  This is necessary for interacting with
        # developer instances of the Boss.
        cls.rmt.project_service.session_send_opts = {'verify': False}
        cls.rmt.metadata_service.session_send_opts = {'verify': False}
        cls.rmt.volume_service.session_send_opts = {'verify': False}
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

        cls.admin = 'admin'
        cls.user_mgr = 'user-manager'
        cls.rsrc_mgr = 'resource-manager'
        cls.user = 'role_test_user{}'.format(random.randint(0, 9999))

        cls.rmt.add_user(cls.user, 'John', 'Doe', 'jd{}@me.com'.format(random.randint(0, 9999)), 'password')
        cls.cleanup_db()
项目:intern    作者:jhuapl-boss    | 项目源码 | 文件源码
def setUpClass(cls):
        """Do an initial DB clean up in case something went wrong the last time.

        If a test failed really badly, the DB might be in a bad state despite
        attempts to clean up during tearDown().
        """
        warnings.filterwarnings('ignore')

        cls.rmt = BossRemote('test.cfg', API_VER)

        # Turn off SSL cert verification.  This is necessary for interacting with
        # developer instances of the Boss.
        cls.rmt.project_service.session_send_opts = {'verify': False}
        cls.rmt.metadata_service.session_send_opts = {'verify': False}
        cls.rmt.volume_service.session_send_opts = {'verify': False}
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

        cls.user = 'user_test_user{}'.format(random.randint(0, 9999))
        cls.first_name = 'john'
        cls.last_name = 'doe'
        cls.email = 'jd{}@me.com'.format(random.randint(0, 9999))
        cls.password = 'password'
项目:reptile    作者:shawncan    | 项目源码 | 文件源码
def getHTMLCookies(url):
    """
    ????????
    """
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    proxies = {
        "http": "http://36.111.205.166:80",
        "https": "http://36.111.205.166:80"
    }

    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36',
    }

    try:
        r = requests.get(url, proxies=proxies, headers=headers, verify=False, timeout=20)
        r.raise_for_status()
        r.encoding = 'utf-8'
        return r.cookies
    except Exception:
        logger.exception("Download HTML Text failed")
项目:reptile    作者:shawncan    | 项目源码 | 文件源码
def getHTMLText(url, cookies):
    """
    ????????
    """
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    proxies = {
        "http": "http://36.111.205.166:80",
        "https": "http://36.111.205.166:80"
    }

    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36',
    }

    try:
        r = requests.get(url, proxies=proxies, headers=headers, cookies=cookies, verify=False, timeout=20)
        r.raise_for_status()
        r.encoding = 'utf-8'
        return r.text
    except Exception:
        logger.exception("Download HTML Text failed")
项目:reptile    作者:shawncan    | 项目源码 | 文件源码
def getStations():
    """
    ???????????
    """
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    st_dict = {}
    stations_url = 'https://kyfw.12306.cn/otn/resources/js/framework/station_name.js?station_version=1.8971'

    html = requests.get(stations_url, verify=False)
    html.raise_for_status()
    stations_list = re.findall(r'[\u4e00-\u9fa5]+\|[A-Z]+', html.text)

    for stations in stations_list:
        area = str(stations).split('|')
        stations_name = area[0]
        stations_code = area[1]
        st_dict[stations_name] = stations_code

    return st_dict
项目:faze    作者:KhasMek    | 项目源码 | 文件源码
def runnessus(self, url, api_akey, api_skey, policy, targets, scan_name, insecure):
        if insecure:
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        scan = ness6rest.Scanner(url=url, api_akey=api_akey, api_skey=api_skey,
                                 insecure=insecure)
        scan.policy_set(name=policy)
        logging.debug("TARGETS: {t}".format(t=targets))
        scan.scan_add(targets=targets, name=scan_name)
        scan.action(action="scans/" + str(scan.scan_id) + "/launch",
                    method="POST")
        scan.scan_uuid = scan.res["scan_uuid"]
        print("{i}   SCAN NAME: {n}".format(i=ctinfo, n=scan.scan_name))
        print("{i}   SCAN UUID: {n}".format(i=ctinfo, n=scan.scan_uuid))
        self.scanstatus(scan.tag_id, scan.scan_uuid, url, api_akey, api_skey,
                        insecure)
        with suppress_stdout():
            output = str(scan.download_scan(export_format="nessus"))
        return output
项目:littleProject    作者:dzqoo    | 项目源码 | 文件源码
def cli():
    """command-line interface"""
    arguments = docopt(__doc__)
    from_station = stations.get(arguments['<from>'])
    to_station = stations.get(arguments['<to>'])
    date = arguments['<date>']  
    # URL
    url = 'https://kyfw.12306.cn/otn/lcxxcx/query?purpose_codes=ADULT&queryDate={}&from_station={}&to_station={}'.format(
        date, from_station, to_station
    )
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    r = requests.get(url,verify = False)
    print(r)
    rows = r.json()['data']['datas']
    trains = TrainCollection(rows)
    trains.pretty_print()
项目:demisto-py    作者:demisto    | 项目源码 | 文件源码
def __init__(self, apiKey, server, username=None, password=None):
        if not ((apiKey or (username and password)) and server):
            raise ValueError("You must provide server argument and key or user & password")
        if not server.find('https://') == 0 and not server.find('http://') == 0:
            raise ValueError("Server must be a url (e.g. 'https://<server>' or 'http://<server>')")
        if not server[-1] == '/':
            server += '/'

        self.server = server
        self.session = Session()
        if not apiKey:
            self.xsrf = True
            try:
                r = self.session.get(server, verify=False)
            except InsecureRequestWarning:
                pass
            self.token = r.cookies[DemistoClient.XSRF_COOKIE_KEY]
            self.username = username
            self.password = password
        else:
            self.xsrf = False
            self.apiKey = apiKey
项目:demisto-py    作者:demisto    | 项目源码 | 文件源码
def req(self, method, path, data):
        h = {"Accept": "application/json",
             "Content-type": "application/json"}

        if self.xsrf:
            h[DemistoClient.XSRF_TOKEN_KEY] = self.token
        else:
            h[DemistoClient.AUTHORIZATION] = self.apiKey
        try:
            if self.session:
                r = self.session.request(method, self.server+path, headers=h, verify=False, json=data)
            else:
                raise RuntimeError("Session not initialized!")
        except InsecureRequestWarning:
            pass
        return r
项目:dractor    作者:VerizonDigital    | 项目源码 | 文件源码
def __init__(self, http_config):
        assert isinstance(http_config, HTTPConfig), "not an instance of HTTPConfig"
        self.http_config = http_config
        super(CustomHTTPAdapter, self).__init__(max_retries=http_config.max_retries)
        self._logger = logging.getLogger(__name__)

        if not http_config.verify_ssl_cert:
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    # pylint: disable=R0913
项目:foreman-op    作者:javiroman    | 项目源码 | 文件源码
def __init__(self):
        self.__host = None
        self.__user = None
        self.__pwd = None
        self.__SSL_VERIFY = False
        self.__POST_HEADERS = None
        self.__APIVER = None

        # TODO: If log level is DEBUG avoid this code
        logging.getLogger('requests').setLevel(logging.CRITICAL)
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
项目:vsphere-automation-sdk-python    作者:vmware    | 项目源码 | 文件源码
def create_unverified_session(session, suppress_warning=True):
    """
    Create a unverified session to disable the server certificate verification.
    This is not recommended in production code.
    """
    session.verify = False
    if suppress_warning:
        # Suppress unverified https request warnings
        from requests.packages.urllib3.exceptions import InsecureRequestWarning
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    return session
项目:check_katello_sync    作者:stdevel    | 项目源码 | 文件源码
def __init__(self, hostname, username, password, verify=True, prefix=""):
        """
        Constructor, creating the class. It requires specifying a
        hostname, username and password to access the API. After
        initialization, a connected is established.

        :param hostname: Foreman host
        :type hostname: str
        :param username: API username
        :type username: str
        :param password: corresponding password
        :type password: str
        :param verify: force SSL verification
        :type verify: bool
        :param prefix: API prefix (e.g. /katello)
        :type prefix: str
        """
        #disable SSL warning outputs
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        #set connection information
        self.HOSTNAME = self.validate_hostname(hostname)
        self.USERNAME = username
        self.PASSWORD = password
        self.VERIFY = verify
        self.URL = "https://{0}{1}/api/v2".format(self.HOSTNAME, prefix)
        #start session and check API version if Foreman API
        self.__connect()
        if prefix == "":
            self.validate_api_support()
项目:plugin.video.brplay    作者:olavopeixoto    | 项目源码 | 文件源码
def init(self, out_stream, url, proxy=None, g_stop_event=None, maxbitrate=0):
        global clientHeader, gproxy, session, use_proxy

        try:
            from requests.packages.urllib3.exceptions import InsecureRequestWarning
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
            session = requests.Session()
            session.cookies = cookieJar
            self.init_done = False
            self.init_url = url
            clientHeader = None
            self.proxy = proxy
            use_proxy = False

            if self.proxy and len(self.proxy) == 0:
                self.proxy = None

            gproxy = self.proxy

            self.out_stream = out_stream

            if g_stop_event: g_stop_event.clear()

            self.g_stopEvent = g_stop_event
            self.maxbitrate = maxbitrate

            if '|' in url:
                sp = url.split('|')
                url = sp[0]
                clientHeader = sp[1]
                log(clientHeader)
                clientHeader = urlparse.parse_qsl(clientHeader)
                log('header received now url and headers are %s | %s' % (url, clientHeader))

            self.url = url

            return True
        except:
            traceback.print_exc()

        return False
项目:ballista    作者:RedHatSatellite    | 项目源码 | 文件源码
def __init__(self, base_url, username, password, verify, organization):
        """
        :param base_url: Url to connect to
        :type base_url: str
        :param username: Username for querying
        :type username: str
        :param password: Password for querying
        :type password: str
        :param verify: Whether to accept self-signed ssl
        :type verify: bool
        :param organization: Organization to use
        :type organization: str
        :returns: KatelloConnection object
        :rtype: KatelloConnection
        """
        self.organization = organization
        self.base_url = base_url
        self.session = requests.Session()
        self.session.auth = (username, password)
        self.session.verify = verify
        self.post_headers = {'Content-Type': 'application/json'}
        if not verify:
            from requests.packages.urllib3.exceptions import InsecureRequestWarning
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

        # Check if we can authenticate
        if 'results' not in self.session.get('%s/katello/api/v2/organizations' % self.base_url).json():
            raise AuthenticationError('Authentication failed')
项目:pyASA    作者:xpac1985    | 项目源码 | 文件源码
def validate_cert(self, validate_cert: bool):
        if not isinstance(validate_cert, bool):
            raise ValueError(f"{type(validate_cert)} is not a valid validate_cert argument type")
        self._caller.validate_cert = bool(validate_cert)
        if not validate_cert:
            # Warnings in urllib3 can only be disabled, not reenabled
            urllib3.disable_warnings(InsecureRequestWarning)
项目:peas    作者:mwrlabs    | 项目源码 | 文件源码
def __init__(self):
        self._backend = PY_ACTIVE_SYNC

        self._creds = {
            'server': None,
            'user': None,
            'password': None,
            'domain': None,     # This could be optional.
            'device_id': None,  # This could be optional.
        }

        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
项目:clus2017_iosxe_demo_booth    作者:CiscoDevNet    | 项目源码 | 文件源码
def create_room(room_name, token):

    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    headers = {'Authorization':'Bearer '+token,
                'Content-Type':'application/json'}
    body = json.dumps({'title':room_name})

    resp = requests.post('https://api.ciscospark.com/v1/rooms',
    verify=False,headers=headers,data=body)

    print resp
项目:clus2017_iosxe_demo_booth    作者:CiscoDevNet    | 项目源码 | 文件源码
def list_rooms(token):

    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    headers = {'Authorization':'Bearer '+token,
                'Content-Type':'application/json'}
    resp = requests.get('https://api.ciscospark.com/v1/rooms',
    verify=False,headers=headers)

    return resp
项目:clus2017_iosxe_demo_booth    作者:CiscoDevNet    | 项目源码 | 文件源码
def list_messages(room_id, token):

    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    headers = {'Authorization':'Bearer '+token,
                'Content-Type':'application/json'}
    resp = requests.get('https://api.ciscospark.com/v1/messages?roomId={}'.format(room_id),
                        verify=False,headers=headers)
    return resp.text
项目:clus2017_iosxe_demo_booth    作者:CiscoDevNet    | 项目源码 | 文件源码
def cleanup_room(room_id, token):

    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

    messages = json.loads(list_messages(room_id, token))

    headers = {'Authorization':'Bearer '+token,
                'Content-Type':'application/json'}

    for message in messages['items']:

        resp = requests.delete('https://api.ciscospark.com/v1/messages/{}'.format(message['id']),
                    verify=False,headers=headers)
        print resp.status_code
项目:opennms_alarmforwarder    作者:NETHINKS    | 项目源码 | 文件源码
def send_message(self, message):
        """send message

        Args:
            - message: message to send
        """
        # set parameters
        target = self.get_parameter("target")
        url = self.get_parameter("url")
        url_parameters = {
            "login": self.get_parameter("user"),
            "pass": self.get_parameter("password"),
            "to": target,
            "message": message
        }

        # send HTTP GET request to SMS Eagle
        try:
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
            response = requests.get(url, params=url_parameters)
            # check response
            if response.status_code != 200:
                self._logger.error("Could not send SMS to %s: %s", target, message)
            else:
                self._logger.info("Send SMS to %s: %s", target, message)
        except:
            self._logger.error("Could not connect to SMS Eagle")
项目:deb-python-proliantutils    作者:openstack    | 项目源码 | 文件源码
def test_init_without_cacert(self, disable_warning_mock):
        rest_client = v1.RestConnectorBase(
            "x.x.x.x", "admin", "Admin", bios_password='foo')

        self.assertEqual(rest_client.host, "x.x.x.x")
        self.assertEqual(rest_client.login, "admin")
        self.assertEqual(rest_client.password, "Admin")
        self.assertIsNone(rest_client.cacert)
        disable_warning_mock.assert_called_once_with(
            urllib3_exceptions.InsecureRequestWarning)
项目:deb-python-proliantutils    作者:openstack    | 项目源码 | 文件源码
def test_init_without_cacert(self, disable_warning_mock):
        ris_client = ris.RISOperations(
            "x.x.x.x", "admin", "Admin", bios_password='foo')

        self.assertEqual(ris_client.host, "x.x.x.x")
        self.assertEqual(ris_client.login, "admin")
        self.assertEqual(ris_client.password, "Admin")
        self.assertIsNone(ris_client.cacert)
        disable_warning_mock.assert_called_once_with(
            urllib3_exceptions.InsecureRequestWarning)
项目:deb-python-proliantutils    作者:openstack    | 项目源码 | 文件源码
def test_init_without_cacert(self, disable_warning_mock):
        ribcl_client = ribcl.RIBCLOperations(
            "x.x.x.x", "admin", "Admin", 60, 443)

        self.assertEqual(ribcl_client.host, "x.x.x.x")
        self.assertEqual(ribcl_client.login, "admin")
        self.assertEqual(ribcl_client.password, "Admin")
        self.assertEqual(ribcl_client.timeout, 60)
        self.assertEqual(ribcl_client.port, 443)
        self.assertIsNone(ribcl_client.cacert)
        disable_warning_mock.assert_called_once_with(
            urllib3_exceptions.InsecureRequestWarning)
项目:ibmsecurity    作者:IBM-Security    | 项目源码 | 文件源码
def _suppress_ssl_warning(self):
        # Disable https warning because of non-standard certs on appliance
        try:
            self.logger.debug("Suppressing SSL Warnings.")
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        except AttributeError:
            self.logger.warning("load requests.packages.urllib3.disable_warnings() failed")
项目:ibmsecurity    作者:IBM-Security    | 项目源码 | 文件源码
def _suppress_ssl_warning(self):
        # Disable https warning because of non-standard certs on appliance
        try:
            self.logger.debug("Suppressing SSL Warnings.")
            requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        except AttributeError:
            self.logger.warning("load requests.packages.urllib3.disable_warnings() failed")
项目:CycloTrain    作者:spcmnspff99    | 项目源码 | 文件源码
def __init__(self, filename = None, activity = None, name = None, description = None, private = False, trainer = False, commute = False, format = 'gpx', handle = None, apiKey = None):
        self.apiKey          = None
        self.reset()

        # turn off the warning because we're not checking the cert
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
项目:VirtBKP    作者:vacosta94    | 项目源码 | 文件源码
def attach_disk(bkpid,diskid,snapid):
 xmlattach =  "<disk id=\""+diskid+"\"><snapshot id=\""+snapid+"\"/> <active>true</active></disk>"
 urlattach = url+"/v3/vms/"+bkpid+"/disks/"
 headers = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
 resp_attach = requests.post(urlattach, data=xmlattach, headers=headers, verify=False, auth=(user,password))

# Funcion para desactivar disco virtual
# Function to deactivate virtual disk
项目:VirtBKP    作者:vacosta94    | 项目源码 | 文件源码
def detach_disk(bkpid,diskid):
 urldelete = url+"/vms/"+bkpid+"/diskattachments/"+diskid
 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
 requests.delete(urldelete, verify=False, auth=(user,password))

# Funcion para obtener el nombre de dispositivo de disco virtual
# Function to get device name of a virtual disk
项目:napalm-panos    作者:napalm-automation-community    | 项目源码 | 文件源码
def _import_file(self, filename):
        if not self.api_key:
            key = self.device.keygen()
        else:
            key = self.api_key

        params = {
            'type': 'import',
            'category': 'configuration',
            'key': key
        }

        path = os.path.basename(filename)

        mef = requests_toolbelt.MultipartEncoder(
            fields={
                'file': (path, open(filename, 'rb'), 'application/octet-stream')
            }
        )

        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        url = 'https://{0}/api/'.format(self.hostname)
        request = requests.post(
            url,
            verify=False,
            params=params,
            headers={'Content-Type': mef.content_type},
            data=mef
        )

        # if something goes wrong just raise an exception
        request.raise_for_status()
        response = xml.etree.ElementTree.fromstring(request.content)

        if response.attrib['status'] == 'error':
            return False
        else:
            return path
项目:AmazonRobot    作者:WuLC    | 项目源码 | 文件源码
def ignore_warnings():
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    requests.packages.urllib3.disable_warnings(SubjectAltNameWarning)
    requests.packages.urllib3.disable_warnings(InsecurePlatformWarning)
    requests.packages.urllib3.disable_warnings(SNIMissingWarning)
项目:solidfire-cli    作者:solidfire    | 项目源码 | 文件源码
def issue_request(self, method, params, endpoint=None):
        if params is None:
            params = {}

        # NOTE(jdg): We allow passing in a new endpoint to issue_api_req
        # to enable some of the multi-cluster features like replication etc
        if endpoint is None:
            endpoint_dict = self.endpoint_dict
        payload = {'method': method, 'params': params}
        url = '%s/json-rpc/%s/' % (endpoint_dict['url'], self.endpoint_version)
        LOG.debug('Issue SolidFire API call: %s' % json.dumps(payload))

        start_time = time.time()
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", exceptions.InsecureRequestWarning)
            req = requests.post(url,
                                data=json.dumps(payload),
                                auth=(endpoint_dict['login'],
                                      endpoint_dict['password']),
                                verify=False,
                                timeout=30)

        # FIXME(jdg): Failure cases like wrong password
        # missing something that cause req.json to puke
        response = req.json()
        req.close()
        end_time = time.time()
        duration = end_time - start_time

        LOG.debug('Raw response data from SolidFire API: %s' % response)
        # TODO(jdg): Add check/retry catch for things where it's appropriate
        if 'error' in response:
            msg = ('API response: %s'), response
            self.request_history.append(
                (method, start_time, duration, 'failed'))
            LOG.error('Error in API request: %s' % response['error'])
            raise sfexceptions.SolidFireRequestException(msg)
        self.request_history.append(
            (method, start_time, duration, 'ok'))
        return response['result']
项目:solidfire-cli    作者:solidfire    | 项目源码 | 文件源码
def send_request(self, method, params, version='1.0', endpoint=None):
        if params is None:
            params = {}

        # NOTE(jdg): We allow passing in a new endpoint to issue_api_req
        # to enable some of the multi-cluster features like replication etc
        if endpoint is None:
            endpoint_dict = self.endpoint_dict
        payload = {'method': method, 'params': params}

        url = '%s/json-rpc/%s/' % (endpoint_dict['url'], version)

        LOG.debug('Issue SolidFire API call: %s' % json.dumps(payload))

        with warnings.catch_warnings():
            warnings.simplefilter("ignore", exceptions.InsecureRequestWarning)
            req = requests.post(url,
                                data=json.dumps(payload),
                                auth=(endpoint_dict['login'],
                                      endpoint_dict['password']),
                                verify=False,
                                timeout=30)
        response = req.json()
        req.close()
        # TODO(jdg): Fix the above, failure cases like wrong password
        # missing something that cause req.json to puke

        LOG.debug('Raw response data from SolidFire API: %s' % response)
        # TODO(jdg): Add check/retry catch for things where it's appropriate
        if 'error' in response:
            msg = ('API response: %s'), response
            raise SolidFireRequestException(msg)
        return response['result']
项目:socrata-py    作者:socrata    | 项目源码 | 文件源码
def live_dangerously(self):
        """
        Disable SSL checking. Note that this should *only* be used while developing
        against a local Socrata instance.
        """
        import requests
        from requests.packages.urllib3.exceptions import InsecureRequestWarning
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        self.verify = False
项目:Tutorial-Chatterbot    作者:isipalma    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        super(Microsoft, self).__init__(**kwargs)
        import requests
        from requests.packages.urllib3.exceptions import InsecureRequestWarning
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

        self.directline_host = kwargs.get('directline_host', 'https://directline.botframework.com')

        # NOTE: Direct Line client credentials are different from your bot's
        # credentials
        self.direct_line_token_or_secret = kwargs.\
            get('direct_line_token_or_secret')

        authorization_header = 'BotConnector  {}'.\
            format(self.direct_line_token_or_secret)

        self.headers = {
            'Authorization': authorization_header,
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'charset': 'utf-8'
        }

        conversation_data = self.start_conversation()
        self.conversation_id = conversation_data.get('conversationId')
        self.conversation_token = conversation_data.get('token')
项目:intern    作者:jhuapl-boss    | 项目源码 | 文件源码
def setUpClass(cls):
        """Do an initial DB clean up in case something went wrong the last time.

        If a test failed really badly, the DB might be in a bad state despite
        attempts to clean up during tearDown().
        """
        warnings.filterwarnings('ignore')
        cls.rmt = BossRemote('test.cfg', API_VER)

        # Turn off SSL cert verification.  This is necessary for interacting with
        # developer instances of the Boss.
        cls.rmt.project_service.session_send_opts = {'verify': False}
        cls.rmt.metadata_service.session_send_opts = {'verify': False}
        cls.rmt.volume_service.session_send_opts = {'verify': False}
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

        cls.create_grp_name = 'int_test_group{}'.format(random.randint(0, 9999))
        cls.existing_grp_name = 'int_test_group_exists{}'.format(random.randint(0, 9999))

        cls.rmt.create_group(cls.existing_grp_name)

        cls.user_name = 'bossadmin'

        # Create a new user because the user tests run under, will
        # automatically be a member and maintainer of any groups created
        # during testing.
        cls.created_user = 'group_test_johndoeski{}'.format(random.randint(0, 9999))

        password = 'myPassW0rd'
        cls.rmt.add_user(cls.created_user, 'John', 'Doeski', 'jdoe{}@rime.com'.format(random.randint(0, 9999)),
                         password)
        token = cls.get_access_token(cls.created_user, password)
        cls.login_user(token)
项目:intern    作者:jhuapl-boss    | 项目源码 | 文件源码
def setUpClass(cls):
        """Do an initial DB clean up in case something went wrong the last time.

        If a test failed really badly, the DB might be in a bad state despite
        attempts to clean up during tearDown().
        """
        cls.rmt = BossRemote('test.cfg', API_VER)

        # Turn off SSL cert verification.  This is necessary for interacting with
        # developer instances of the Boss.
        cls.rmt.project_service.session_send_opts = {'verify': False}
        cls.rmt.metadata_service.session_send_opts = {'verify': False}
        cls.rmt.volume_service.session_send_opts = {'verify': False}
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

        coll_name = 'collection_perm_test-{}'.format(random.randint(0, 9999))
        cls.coll = CollectionResource(coll_name, 'bar')

        cf_name = 'PermissionTestFrame{}'.format(random.randint(0, 9999))
        cls.coord = CoordinateFrameResource(
            cf_name, 'Test coordinate frame.', 0, 10, -5, 5, 3, 6,
            1, 1, 1, 'nanometers', 0, 'nanoseconds')

        cls.exp = ExperimentResource(
            'perm_test_exp', cls.coll.name, cls.coord.name, 'my experiment', 1,
            'isotropic', 1)

        cls.chan = ChannelResource(
            'perm_test_ch', cls.coll.name, cls.exp.name, 'image', 'test channel',
            0, 'uint8', 0)

        cls.grp_name = 'int_perm_test_group{}'.format(random.randint(0, 9999))

        cls.rmt.create_project(cls.coll)
        cls.rmt.create_project(cls.coord)
        cls.rmt.create_project(cls.exp)
        cls.rmt.create_project(cls.chan)
        cls.rmt.create_group(cls.grp_name)
项目:OAuth2Client    作者:antechrestos    | 项目源码 | 文件源码
def __init__(self, service_information, proxies=None):
        self.service_information = service_information
        self.proxies = proxies if proxies is not None else dict(http='', https='')
        self.authorization_code_context = None
        self.refresh_token = None
        self._session = None
        if service_information.skip_ssl_verifications:
            from requests.packages.urllib3.exceptions import InsecureRequestWarning
            import warnings

            warnings.filterwarnings('ignore', 'Unverified HTTPS request is being made.*', InsecureRequestWarning)
项目:simphony-remote    作者:simphony    | 项目源码 | 文件源码
def main():
    # We silence the insecure requests warnings we get for using
    # self-signed certificates.
    disable_warnings(InsecureRequestWarning)
    cli(obj=RemoteAppRestContext())
项目:katprep    作者:stdevel    | 项目源码 | 文件源码
def __init__(self, log_level, hostname,
        username, password, verify=True, prefix=""):
        """
        Constructor, creating the class. It requires specifying a
        hostname, username and password to access the API. After
        initialization, a connected is established.

        :param log_level: log level
        :type log_level: logging
        :param hostname: Foreman host
        :type hostname: str
        :param username: API username
        :type username: str
        :param password: corresponding password
        :type password: str
        :param verify: force SSL verification
        :type verify: bool
        :param prefix: API prefix (e.g. /katello)
        :type prefix: str
        """
        #set logging
        self.LOGGER.setLevel(log_level)
        #disable SSL warning outputs
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        #set connection information
        self.HOSTNAME = self.validate_hostname(hostname)
        self.USERNAME = username
        self.PASSWORD = password
        self.VERIFY = verify
        self.URL = "https://{0}{1}/api/v2".format(self.HOSTNAME, prefix)
        #start session and check API version if Foreman API
        self.__connect()
        if prefix == "":
            self.validate_api_support()
项目:idrac-ansible-module    作者:dell    | 项目源码 | 文件源码
def __init__ (self):
        # Disable insecure-certificate-warning message
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        return
项目:databricks-cli    作者:databricks    | 项目源码 | 文件源码
def perform_query(self, method, path, data = {}, headers = None):
        """set up connection and perform query"""
        if headers is None:
            headers = self.default_headers

        with warnings.catch_warnings():
            warnings.simplefilter("ignore", exceptions.InsecureRequestWarning)
            resp = self.session.request(method, self.url + path, data = json.dumps(data),
                verify = self.verify, headers = headers)

        try:
            resp.raise_for_status()
        except requests.exceptions.HTTPError, e:
            raise e
        return resp.json()