我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用http.client.HTTPSConnection()。
def httpPost(url,resource,params): headers = { "Content-type" : "application/x-www-form-urlencoded", } try : conn = httplib.HTTPSConnection(url, timeout=10) temp_params = urllib.parse.urlencode(params) conn.request("POST", resource, temp_params, headers) response = conn.getresponse() data = response.read().decode('utf-8') params.clear() conn.close() return data except: # except Exception,e: # print(Exception,":",e) traceback.print_exc() return False
def validate_optional_args(args): """Check if an argument was provided that depends on a module that may not be part of the Python standard library. If such an argument is supplied, and the module does not exist, exit with an error stating which module is missing. """ optional_args = { 'json': ('json/simplejson python module', json), 'secure': ('SSL support', HTTPSConnection), } for arg, info in optional_args.items(): if getattr(args, arg, False) and info[1] is None: raise SystemExit('%s is not installed. --%s is ' 'unavailable' % (info[0], arg))
def textanalyze(self,index_name, analyzer, text): # Create JSON string for request body reqobject={} reqobject['text'] = text reqobject['analyzer'] = analyzer io=StringIO() json.dump(reqobject, io) req_body = io.getvalue() # HTTP request to Azure search REST API conn = httplib.HTTPSConnection(self.__api_url) conn.request("POST", u"/indexes/{0}/analyze?api-version={1}".format(index_name, _AZURE_SEARCH_API_VERSION), req_body, self.headers) response = conn.getresponse() #print "status:", response.status, response.reason data = (response.read()).decode('utf-8') #print("data:{}".format(data)) conn.close() return data
def sendRequest(host, port, path, headers, params, reqType="GET"): params = urlencode(params) path = path + "?"+ params if reqType == "GET" and params else path if len(headers): logger.debug(headers) if len(params): logger.debug(params) logger.debug("Opening connection to %s" % host); conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port) logger.debug("Sending %s request to %s" % (reqType, path)) conn.request(reqType, path, params, headers); response = conn.getresponse() return response
def connect(self): """Overrides HTTPSConnection.connect to specify TLS version""" # Standard implementation from HTTPSConnection, which is not # designed for extension, unfortunately if sys.version_info >= (2, 7): sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address) elif sys.version_info >= (2, 6): sock = socket.create_connection((self.host, self.port), self.timeout) else: sock = socket.create_connection((self.host, self.port)) if getattr(self, '_tunnel_host', None): self.sock = sock self._tunnel() # This is the only difference; default wrap_socket uses SSLv23 self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_TLSv1)
def __getattr__(self, name): # Anything that can't be found on this instance is presumably a # property of underlying connection object. # We need to be a little bit careful here. There are a few methods # that can act on a HTTPSConnection before it actually connects to # the remote server. We don't want to change the semantics of the, # HTTPSConnection so we need to spot these and queue them up. When # we actually create the backing Connection, we'll apply them # immediately. These methods can't throw exceptions, so we should # be fine. delay_methods = ["set_tunnel", "set_debuglevel"] if self._conn is None and name in delay_methods: # Return a little closure that saves off the method call to # apply later. def capture(obj, *args, **kwargs): self._call_queue.append((name, args, kwargs)) return capture elif self._conn is None: # We're being told to do something! We can now connect to the # remote server and build the connection object. self._delayed_connect() # Call through to the underlying object. return getattr(self._conn, name)
def test_local_bad_hostname(self): # The (valid) cert doesn't validate the HTTP hostname import ssl from test.ssl_servers import make_https_server server = make_https_server(self, CERT_fakehostname) context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(CERT_fakehostname) h = client.HTTPSConnection('localhost', server.port, context=context) with self.assertRaises(ssl.CertificateError): h.request('GET', '/') # Same with explicit check_hostname=True h = client.HTTPSConnection('localhost', server.port, context=context, check_hostname=True) with self.assertRaises(ssl.CertificateError): h.request('GET', '/') # With check_hostname=False, the mismatching is ignored h = client.HTTPSConnection('localhost', server.port, context=context, check_hostname=False) h.request('GET', '/nonexistent') resp = h.getresponse() self.assertEqual(resp.status, 404)
def checkUpdate(): try: conn = httplib.HTTPSConnection("raw.githubusercontent.com", 443) conn.request("GET", "/JonathanSalwan/ROPgadget/master/ropgadget/version.py") except: print("Can't connect to raw.githubusercontent.com") return d = conn.getresponse().read() majorVersion = re.search("MAJOR_VERSION.+=.+(?P<value>[\d])", d).group("value") minorVersion = re.search("MINOR_VERSION.+=.+(?P<value>[\d])", d).group("value") webVersion = int("%s%s" %(majorVersion, minorVersion)) curVersion = int("%s%s" %(MAJOR_VERSION, MINOR_VERSION)) if webVersion > curVersion: print("The version %s.%s is available. Currently, you use the version %d.%d." %(majorVersion, minorVersion, MAJOR_VERSION, MINOR_VERSION)) else: print("Your version is up-to-date.")
def test_local_bad_hostname(self): # The (valid) cert doesn't validate the HTTP hostname import ssl server = self.make_server(CERT_fakehostname) context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(CERT_fakehostname) h = client.HTTPSConnection('localhost', server.port, context=context) with self.assertRaises(ssl.CertificateError): h.request('GET', '/') # Same with explicit check_hostname=True h = client.HTTPSConnection('localhost', server.port, context=context, check_hostname=True) with self.assertRaises(ssl.CertificateError): h.request('GET', '/') # With check_hostname=False, the mismatching is ignored h = client.HTTPSConnection('localhost', server.port, context=context, check_hostname=False) h.request('GET', '/nonexistent') resp = h.getresponse() self.assertEqual(resp.status, 404) del server
def get_json_response(server, api, username, password): """ Returns the response from the URL specified """ try: # lib opener response = {} context = ssl._create_unverified_context() conn = HTTPSConnection(server, context=context) auth = str.encode("%s:%s" % (username, password)) user_and_pass = b64encode(auth).decode("ascii") headers = {'Authorization': 'Basic %s' % user_and_pass, "Accept": 'application/json'} conn.request('GET', api, headers=headers) res = conn.getresponse() bit_data = res.read() string_data = bit_data.decode(encoding='UTF-8') response['data'] = string_data response['status'] = 200 except: print("--Unexpected error:", sys.exc_info()[1]) response['data'] = sys.exc_info()[1] response['status'] = 400 return response
def http(method, url, body=None, headers=None): url_info = urlparse.urlparse(url) if url_info.scheme == "https": con = httplib.HTTPSConnection(url_info.hostname, url_info.port or 443) else: con = httplib.HTTPConnection(url_info.hostname, url_info.port or 80) con.request(method, url_info.path, body, headers) response = con.getresponse() try: if 400 <= response.status < 500: raise HttpClientError(response.status, response.reason, response.read()) elif 500 <= response.status < 600: raise HttpServerError(response.status, response.reason, response.read()) else: yield response finally: con.close()
def wisp_callback(url): """ This returns current server's time of given url. :param url: url. (e.g. www.example.com, http://www.example.com, https://www.example.com:8080/test) :return: Date and time or the server. """ parsed_url = urlparse(url) protocol = parsed_url.scheme or "http" port = parsed_url.port or (80 if protocol == "http" else 443) path = parsed_url.path or "/" if parsed_url.hostname: url = parsed_url.hostname else: path = "/" url = parsed_url.path request = "GET" conn = (HTTPConnection if protocol == "http" else HTTPSConnection)(url, port) conn.request(request, path) response = conn.getresponse() time = response.getheader("Date") return time
def ise_m_activecount(ip,username,password,port=443): #This sets up the https connection context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)#ssl??????? context.verify_mode = ssl.CERT_NONE#CERT_NONE, CERT_OPTIONAL or CERT_REQUIRED??????????? context.load_verify_locations('/usr/share/kde4/apps/kssl/ca-bundle.crt')#????? c = HTTPSConnection(ip, port=port, context=context) user_pass_str = username + ':' + password user_pass_str_encode = user_pass_str.encode() userAndPass = b64encode(user_pass_str_encode).decode("ascii") headers = { 'Authorization' : 'Basic %s' % userAndPass } c.request('GET', '/admin/API/mnt/Session/ActiveCount', headers=headers) res = c.getresponse() data = res.read() root = XML(data.decode()) activecount = root.find('count').text return activecount
def https_connection(self): """Return an https connection to this Connection's endpoint. Returns a 3-tuple containing:: 1. The :class:`HTTPSConnection` instance 2. Dictionary of auth headers to be used with the connection 3. The root url path (str) to be used for requests. """ endpoint = self.endpoint host, remainder = endpoint.split(':', 1) port = remainder if '/' in remainder: port, _ = remainder.split('/', 1) conn = HTTPSConnection( host, int(port), context=self._get_ssl(self.cacert), ) path = ( "/model/{}".format(self.uuid) if self.uuid else "" ) return conn, self._http_headers(), path
def httpGet(url, resource, params=''): try : conn = httplib.HTTPSConnection(url, timeout=10) conn.request("GET",resource + '?' + params) response = conn.getresponse() data = response.read().decode('utf-8') return json.loads(data) except: return False
def do_get(url): parsed = urlparse.urlparse(url) path = parsed.path if parsed.query: path = '%s?%s' % (path, parsed.query) if parsed.scheme == 'http': conn = httplib.HTTPConnection(TARGET_IP) elif parsed.scheme == 'https': conn = httplib.HTTPSConnection(TARGET_IP) conn.request('GET', path, headers={'Host': parsed.netloc}) resp = conn.getresponse() body = resp.read().decode('utf8') resp.close() conn.close() return resp, body
def do_get(url): parsed = urlparse.urlparse(url) path = parsed.path if parsed.query: path = '%s?%s' % (path, parsed.query) if parsed.scheme == 'http': conn = httplib.HTTPConnection(TARGET_IP) elif parsed.scheme == 'https': conn = httplib.HTTPSConnection(TARGET_IP, timeout=8, context=ssl._create_unverified_context()) conn.request('GET', path, headers={'Host': parsed.netloc}) resp = conn.getresponse() body = resp.read().decode('utf8') resp.close() conn.close() return resp, body
def http_post(posturl, headers, data): parsed = urllib.parse.urlparse(posturl) if ':' in parsed.netloc: host, port = parsed.netloc.split(':') else: host = parsed.netloc port = '443' if posturl.startswith('https') else '80' if posturl.startswith('https'): conn = HTTPSConnection(host, int(port)) else: conn = HTTPConnection(host, int(port)) conn.request('POST', parsed.path, headers=headers, body=data) return conn.getresponse()
def __init__(self, proxytype, proxyaddr, proxyport=None, rdns=True, username=None, password=None, *args, **kwargs): self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password) httplib.HTTPSConnection.__init__(self, *args, **kwargs)
def mastQuery(request): """Perform a MAST query. Parameters ---------- request (dictionary): The Mashup request json object Returns head,content where head is the response HTTP headers, and content is the returned data. """ server = 'mast.stsci.edu' # Grab Python Version version = ".".join(map(str, sys.version_info[:3])) # Create Http Header Variables headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain", "User-agent": "python-requests/" + version} # Encoding the request as a json string requestString = json.dumps(request) requestString = urlencode(requestString) # opening the https connection conn = httplib.HTTPSConnection(server) # Making the query conn.request("POST", "/api/v0/invoke", "request=" + requestString, headers) # Getting the response resp = conn.getresponse() head = resp.getheaders() content = resp.read().decode('utf-8') # Close the https connection conn.close() return head, content
def get_https_connection(self, host, timeout=300): """Returns a HTTPSConnection""" return HTTPSConnection( host, key_file=self._ssl_config['key'], cert_file=self._ssl_config['cert'] )
def __init__(self, token = ''): self.connection = HTTPSConnection('dualis.dhbw.de') self.token = token self.stdHeader = { 'Cookie': 'cnsc=0', # The Dualis System assumes by the presence of this field that we are ready to handle # and store cookies # Yeah, right... We definitively do that... # (No, we don't have to. Chrome is also sending cnsc=0 everytime and it works fine) 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36' # copied straight out of Chrome }
def __init__(self , token): self.token = token self.httpClient = HTTPSConnection(vs.HTTP_URL, vs.HTTP_PORT)
def _new_conn(self): """ Return a fresh :class:`httplib.HTTPSConnection`. """ self.num_connections += 1 log.info("Starting new HTTPS connection (%d): %s" % (self.num_connections, self.host)) if not ssl: # Platform-specific: Python compiled without +ssl if not HTTPSConnection or HTTPSConnection is object: raise SSLError("Can't connect to HTTPS URL because the SSL " "module is not available.") return HTTPSConnection(host=self.host, port=self.port, strict=self.strict) connection = VerifiedHTTPSConnection(host=self.host, port=self.port, strict=self.strict) connection.set_cert(key_file=self.key_file, cert_file=self.cert_file, cert_reqs=self.cert_reqs, ca_certs=self.ca_certs) if self.ssl_version is None: connection.ssl_version = ssl.PROTOCOL_SSLv23 else: connection.ssl_version = self.ssl_version return connection
def __init__(self, host, **kwargs): httplib.HTTPSConnection.__init__(self, host, **kwargs)
def connect(self): """Overrides HTTPSConnection.connect to specify TLS version""" # Standard implementation from HTTPSConnection, which is not # designed for extension, unfortunately if sys.version_info >= (2, 7): sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address) elif sys.version_info >= (2, 6): sock = socket.create_connection((self.host, self.port), self.timeout) else: sock = socket.create_connection((self.host, self.port)) if getattr(self, '_tunnel_host', None): self.sock = sock self._tunnel() if hasattr(ssl, 'SSLContext'): # Since python 2.7.9, tls 1.1 and 1.2 are supported via # SSLContext ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ssl_context.options |= ssl.OP_NO_SSLv2 ssl_context.options |= ssl.OP_NO_SSLv3 if self.key_file and self.cert_file: ssl_context.load_cert_chain(keyfile=self.key_file, certfile=self.cert_file) self.sock = ssl_context.wrap_socket(sock) else: # This is the only difference; default wrap_socket uses SSLv23 self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_TLSv1)
def __init__(self, url, key_file=None, cert_file=None, debug=False): """LXD Client. :param url: The URL of the LXD server. (e.g. unix:/var/lib/lxd/unix.socket or https://127.0.0.1) :type url: ``str`` :param key_file: The path of the client certificate key file. :type key_file: ``str`` :param cert_file: The path of the client certificate file. :type cert_file: ``str`` :param debug: The debug flag. The request and response are stored in logs when debug is true. :type debug: ``bool`` """ self.url = url self.debug = debug self.logs = [] if url.startswith('https:'): self.cert_file = cert_file self.key_file = key_file parts = generic_urlparse(urlparse(self.url)) ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ctx.load_cert_chain(cert_file, keyfile=key_file) self.connection = HTTPSConnection(parts.get('netloc'), context=ctx) elif url.startswith('unix:'): unix_socket_path = url[len('unix:'):] self.connection = UnixHTTPConnection(unix_socket_path) else: raise LXDClientException('URL scheme must be unix: or https:')
def __init__(self, *args, **kwargs): httplib.HTTPSConnection.__init__(self, *args, **kwargs) if HAS_SSLCONTEXT: self.context = create_default_context() if self.cert_file: self.context.load_cert_chain(self.cert_file, self.key_file)
def testHTTPSConnectionSourceAddress(self): self.conn = client.HTTPSConnection(HOST, self.port, source_address=('', self.source_port)) # We don't test anything here other the constructor not barfing as # this code doesn't deal with setting up an active running SSL server # for an ssl_wrapped connect() to actually return from.
def setUp(self): if not hasattr(client, 'HTTPSConnection'): self.skipTest('ssl support required')
def test_attributes(self): # simple test to check it's storing the timeout h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) self.assertEqual(h.timeout, 30)
def test_networked(self): # Default settings: no cert verification is done support.requires('network') with support.transient_internet('svn.python.org'): h = client.HTTPSConnection('svn.python.org', 443) h.request('GET', '/') resp = h.getresponse() self._check_svn_python_org(resp)
def test_networked_good_cert(self): # We feed a CA cert that validates the server's cert import ssl support.requires('network') with support.transient_internet('svn.python.org'): context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(CACERT_svn_python_org) h = client.HTTPSConnection('svn.python.org', 443, context=context) h.request('GET', '/') resp = h.getresponse() self._check_svn_python_org(resp)
def test_local_good_hostname(self): # The (valid) cert validates the HTTP hostname import ssl from test.ssl_servers import make_https_server server = make_https_server(self, CERT_localhost) context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(CERT_localhost) h = client.HTTPSConnection('localhost', server.port, context=context) h.request('GET', '/nonexistent') resp = h.getresponse() self.assertEqual(resp.status, 404)
def get_ip_address(self, set_result=False): """ Get the bridge ip address from the meethue.com nupnp api """ connection = httplib.HTTPSConnection('www.meethue.com') connection.request('GET', '/api/nupnp') logger.info('Connecting to meethue.com/api/nupnp') result = connection.getresponse() if PY3K: data = json.loads(str(result.read(), encoding='utf-8')) else: result_str = result.read() data = json.loads(result_str) """ close connection after read() is done, to prevent issues with read() """ connection.close() ip = str(data[0]['internalipaddress']) if ip is not '': if set_result: self.ip = ip return ip else: return False