我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用http.client.HTTPConnection()。
def stop(self): """Stops the webserver.""" self._http_on = False conn = HTTPConnection( self._host + ":" + str(self._port) ) try: conn.request("GET", "/shutdown") except ConnectionRefusedError: pass conn.close() while self._http_thread.is_alive(): sleep(0.5) self._httpd.server_close() self._httpd.socket.close()
def retrieve_json(self,url): ''' Retrieve data from the Veneer service at the given url path. url: Path to required resource, relative to the root of the Veneer service. ''' if PRINT_URLS: print("*** %s ***" % (url)) if self.protocol=='file': text = open(self.prefix+url+self.data_ext).read() else: conn = hc.HTTPConnection(self.host,port=self.port) conn.request('GET',quote(url+self.data_ext)) resp = conn.getresponse() text = resp.read().decode('utf-8') #text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8') text = self._replace_inf(text) if PRINT_ALL: print(json.loads(text)) print("") return json.loads(text)
def block_http(whitelist): def whitelisted(self, host, *args, **kwargs): try: string_type = basestring except NameError: # python3 string_type = str if isinstance(host, string_type) and host not in whitelist: logger.warning('Denied HTTP connection to: %s' % host) raise MockHttpCall(host) logger.debug('Allowed HTTP connection to: %s' % host) return self.old(host, *args, **kwargs) whitelisted.blockade = True if not getattr(httplib.HTTPConnection, 'blockade', False): logger.debug('Monkey patching httplib') httplib.HTTPConnection.old = httplib.HTTPConnection.__init__ httplib.HTTPConnection.__init__ = whitelisted
def do_command(self, verb, args): conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout) try: body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8')) for i in range(len(args)): body += '&' + unicode(i+1) + '=' + \ urllib_parse.quote_plus(unicode(args[i]).encode('utf-8')) if (None != self.sessionId): body += "&sessionId=" + unicode(self.sessionId) headers = { "Content-Type": "application/x-www-form-urlencoded; charset=utf-8" } conn.request("POST", "/selenium-server/driver/", body, headers) response = conn.getresponse() data = unicode(response.read(), "UTF-8") if (not data.startswith('OK')): raise Exception(data) return data finally: conn.close()
def sendRequest(host, port, path, headers, params, reqType="GET"): params = urlencode(params) path = path + "?"+ params if reqType == "GET" and params else path if len(headers): logger.debug(headers) if len(params): logger.debug(params) logger.debug("Opening connection to %s" % host); conn = httplib.HTTPSConnection(host ,port) if port == 443 else httplib.HTTPConnection(host ,port) logger.debug("Sending %s request to %s" % (reqType, path)) conn.request(reqType, path, params, headers); response = conn.getresponse() return response
def assertResponse(self, app, method, url, status=None, headers=None, content=None): host, port = 'localhost', 80 http_client_intercept.install() add_wsgi_intercept(host, port, app) client = http_lib.HTTPConnection(host, port) client.request(method, url) response = client.getresponse() if status is not None: self.assertEqual(response.status, status) headers = headers or {} for k, v in headers.items(): self.assertEqual(response.getheader(k), v) if content is not None: self.assertEqual(response.read(), content) client.close() remove_wsgi_intercept(host, port) http_client_intercept.uninstall()
def test_ipv6host_header(self): # Default host header on IPv6 transaction should wrapped by [] if # its actual IPv6 address expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ b'Accept-Encoding: identity\r\n\r\n' conn = client.HTTPConnection('[2001::]:81') sock = FakeSocket('') conn.sock = sock conn.request('GET', '/foo') self.assertTrue(sock.data.startswith(expected)) expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ b'Accept-Encoding: identity\r\n\r\n' conn = client.HTTPConnection('[2001:102A::]') sock = FakeSocket('') conn.sock = sock conn.request('GET', '/foo') self.assertTrue(sock.data.startswith(expected))
def test_epipe(self): sock = EPipeSocket( "HTTP/1.0 401 Authorization Required\r\n" "Content-type: text/html\r\n" "WWW-Authenticate: Basic realm=\"example\"\r\n", b"Content-Length") conn = client.HTTPConnection("example.com") conn.sock = sock self.assertRaises(socket.error, lambda: conn.request("PUT", "/url", "body")) resp = conn.getresponse() self.assertEqual(401, resp.status) self.assertEqual("Basic realm=\"example\"", resp.getheader("www-authenticate")) # Test lines overflowing the max line size (_MAXLINE in http.client)
def test_userpass_inurl_w_spaces(self): self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!") try: userpass = "a b:c d" url = "http://{0}@python.org/".format(userpass) fakehttp_wrapper = http_client.HTTPConnection authorization = ("Authorization: Basic %s\r\n" % b64encode(userpass.encode("ASCII")).decode("ASCII")) fp = urlopen(url) # The authorization header must be in place self.assertIn(authorization, fakehttp_wrapper.buf.decode("UTF-8")) self.assertEqual(fp.readline(), b"Hello!") self.assertEqual(fp.readline(), b"") # the spaces are quoted in URL so no match self.assertNotEqual(fp.geturl(), url) self.assertEqual(fp.getcode(), 200) finally: self.unfakehttp()
def test_send_updating_file(self): def data(): yield 'data' yield None yield 'data_two' class UpdatingFile(): mode = 'r' d = data() def read(self, blocksize=-1): return self.d.__next__() expected = b'data' conn = client.HTTPConnection('example.com') sock = FakeSocket("") conn.sock = sock conn.send(UpdatingFile()) self.assertEqual(sock.data, expected)
def test_epipe(self): sock = EPipeSocket( "HTTP/1.0 401 Authorization Required\r\n" "Content-type: text/html\r\n" "WWW-Authenticate: Basic realm=\"example\"\r\n", b"Content-Length") conn = client.HTTPConnection("example.com") conn.sock = sock self.assertRaises(OSError, lambda: conn.request("PUT", "/url", "body")) resp = conn.getresponse() self.assertEqual(401, resp.status) self.assertEqual("Basic realm=\"example\"", resp.getheader("www-authenticate")) # Test lines overflowing the max line size (_MAXLINE in http.client)
def status(self): """ Check job status on the server """ headers = {} # add authentication and other cookies to the header try: location = self.location location = location.split('://')[-1] self.response = self.session.get(self.protocol + '://' + location) data = self.response.text except: connection = HTTPConnection(self.host, self.port) connection.request("GET", self.path + "/" + self.jobid, headers) self.response = connection.getresponse() data = self.response.read() #XML response: parse it to obtain the current status dom = parseString(data) phase_element = dom.getElementsByTagName('uws:phase')[0] phase_value_element = phase_element.firstChild phase = phase_value_element.toxml() return phase
def http(method, url, body=None, headers=None): url_info = urlparse.urlparse(url) if url_info.scheme == "https": con = httplib.HTTPSConnection(url_info.hostname, url_info.port or 443) else: con = httplib.HTTPConnection(url_info.hostname, url_info.port or 80) con.request(method, url_info.path, body, headers) response = con.getresponse() try: if 400 <= response.status < 500: raise HttpClientError(response.status, response.reason, response.read()) elif 500 <= response.status < 600: raise HttpServerError(response.status, response.reason, response.read()) else: yield response finally: con.close()
def wisp_callback(url): """ This returns current server's time of given url. :param url: url. (e.g. www.example.com, http://www.example.com, https://www.example.com:8080/test) :return: Date and time or the server. """ parsed_url = urlparse(url) protocol = parsed_url.scheme or "http" port = parsed_url.port or (80 if protocol == "http" else 443) path = parsed_url.path or "/" if parsed_url.hostname: url = parsed_url.hostname else: path = "/" url = parsed_url.path request = "GET" conn = (HTTPConnection if protocol == "http" else HTTPSConnection)(url, port) conn.request(request, path) response = conn.getresponse() time = response.getheader("Date") return time
def _do_read(self, query, raw=False): # send query to server, return JSON rethinker = doublethink.Rethinker(db="trough_configuration", servers=self.rethinkdb) healthy_databases = list(rethinker.table('services').get_all(self.database, index='segment').run()) healthy_databases = [db for db in healthy_databases if db['role'] == 'trough-read' and (rethinker.now().run() - db['last_heartbeat']).seconds < db['ttl']] try: assert len(healthy_databases) > 0 except: raise Exception('No healthy node found for segment %s' % self.database) url = urlparse(healthy_databases[0].get('url')) if self.proxy: conn = HTTPConnection(self.proxy, self.proxy_port) conn.set_tunnel(url.netloc, url.port) conn.sock = socks.socksocket() conn.sock.set_proxy(self.proxy_type, self.proxy, self.proxy_port) conn.sock.connect((url.netloc.split(":")[0], url.port)) else: conn = HTTPConnection(url.netloc) request_path = "%s?%s" % (url.path, url.query) conn.request("POST", request_path, query) response = conn.getresponse() results = json.loads(response.read()) self._last_results = results
def test_httpclient_in_out(): hostname = str(uuid4()) port = 9999 url = 'http://%s:%s/' % (hostname, port) with HttpClientInterceptor(app=app, url=url): client = http_client.HTTPConnection(hostname, port) client.request('GET', '/') response = client.getresponse() content = response.read().decode('utf-8') assert response.status == 200 assert 'WSGI intercept successful!' in content # outside the context manager the intercept does not work with py.test.raises(socket.gaierror): client = http_client.HTTPConnection(hostname, port) client.request('GET', '/') # Httplib2
def _send_soap_request(location, upnp_schema, control_path, soap_fn, soap_message): """ Send out SOAP request to UPnP device and return a response. """ headers = { 'SOAPAction': ( '"urn:schemas-upnp-org:service:{schema}:' '1#{fn_name}"'.format(schema=upnp_schema, fn_name=soap_fn) ), 'Content-Type': 'text/xml' } logging.debug("Sending UPnP request to {0}:{1}...".format( location.hostname, location.port)) conn = client.HTTPConnection(location.hostname, location.port) conn.request('POST', control_path, soap_message, headers) response = conn.getresponse() conn.close() return _parse_for_errors(response)
def _normalize_conn_params(self, conn_or_conn_params): """Normalize conn_param tuple. Args: conn_or_conn_params: either a HTTP(S)Connection object or the resolved conn_params tuple returned by self._conn_params(). Returns: Normalized conn_param tuple """ if (not isinstance(conn_or_conn_params, tuple) and not isinstance(conn_or_conn_params, httplib.HTTPConnection)): LOG.debug("Invalid conn_params value: '%s'", str(conn_or_conn_params)) return conn_or_conn_params if isinstance(conn_or_conn_params, httplib.HTTPConnection): conn_params = self._conn_params(conn_or_conn_params) else: conn_params = conn_or_conn_params host, port, is_ssl = conn_params if port is None: port = 443 if is_ssl else 80 return (host, port, is_ssl)
def saveFailedTest(data, expect, filename): """Upload failed test images to web server to allow CI test debugging. """ commit = runSubprocess(['git', 'rev-parse', 'HEAD']) name = filename.split('/') name.insert(-1, commit.strip()) filename = '/'.join(name) host = 'data.pyqtgraph.org' # concatenate data, expect, and diff into a single image ds = data.shape es = expect.shape shape = (max(ds[0], es[0]) + 4, ds[1] + es[1] + 8 + max(ds[1], es[1]), 4) img = np.empty(shape, dtype=np.ubyte) img[..., :3] = 100 img[..., 3] = 255 img[2:2+ds[0], 2:2+ds[1], :ds[2]] = data img[2:2+es[0], ds[1]+4:ds[1]+4+es[1], :es[2]] = expect diff = makeDiffImage(data, expect) img[2:2+diff.shape[0], -diff.shape[1]-2:-2] = diff png = makePng(img) conn = httplib.HTTPConnection(host) req = urllib.urlencode({'name': filename, 'data': base64.b64encode(png)}) conn.request('POST', '/upload.py', req) response = conn.getresponse().read() conn.close() print("\nImage comparison failed. Test result: %s %s Expected result: " "%s %s" % (data.shape, data.dtype, expect.shape, expect.dtype)) print("Uploaded to: \nhttp://%s/data/%s" % (host, filename)) if not response.startswith(b'OK'): print("WARNING: Error uploading data to %s" % host) print(response)
def drop_run(self,run='latest'): ''' Tell Source to drop/delete a specific set of results from memory. run: Run number to delete. Default ='latest'. Valid values are 'latest' and integers from 1 ''' assert self.live_source conn = hc.HTTPConnection(self.host,port=self.port) conn.request('DELETE','/runs/%s'%str(run)) resp = conn.getresponse() code = resp.getcode() return code
def do_get(url): parsed = urlparse.urlparse(url) path = parsed.path if parsed.query: path = '%s?%s' % (path, parsed.query) if parsed.scheme == 'http': conn = httplib.HTTPConnection(TARGET_IP) elif parsed.scheme == 'https': conn = httplib.HTTPSConnection(TARGET_IP) conn.request('GET', path, headers={'Host': parsed.netloc}) resp = conn.getresponse() body = resp.read().decode('utf8') resp.close() conn.close() return resp, body
def do_get(url): parsed = urlparse.urlparse(url) path = parsed.path if parsed.query: path = '%s?%s' % (path, parsed.query) if parsed.scheme == 'http': conn = httplib.HTTPConnection(TARGET_IP) elif parsed.scheme == 'https': conn = httplib.HTTPSConnection(TARGET_IP, timeout=8, context=ssl._create_unverified_context()) conn.request('GET', path, headers={'Host': parsed.netloc}) resp = conn.getresponse() body = resp.read().decode('utf8') resp.close() conn.close() return resp, body
def http_size(host_name, path): try: from httplib import HTTPConnection except ImportError: from http.client import HTTPConnection try: conn = HTTPConnection(host_name, timeout=3) conn.request("GET", path) resp = conn.getresponse() return int(resp.getheader("content-length")) except Exception as e: print(e) return 0
def http_post(posturl, headers, data): parsed = urllib.parse.urlparse(posturl) if ':' in parsed.netloc: host, port = parsed.netloc.split(':') else: host = parsed.netloc port = '443' if posturl.startswith('https') else '80' if posturl.startswith('https'): conn = HTTPSConnection(host, int(port)) else: conn = HTTPConnection(host, int(port)) conn.request('POST', parsed.path, headers=headers, body=data) return conn.getresponse()
def make_connection(self, host): self.realhost = host if self.proxy: return httplib.HTTPConnection(self.proxy, timeout=self.timeout) else: return httplib.HTTPConnection(self.realhost, timeout=self.timeout)
def __init__(self, proxytype, proxyaddr, proxyport=None, rdns=True, username=None, password=None, *args, **kwargs): self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password) httplib.HTTPConnection.__init__(self, *args, **kwargs)
def init_app(self, app): self.token = app.config.get('HUSKAR_TOKEN') self.team = app.config.get('HUSKAR_TEAM') host = app.config.get('HUSKAR_API_HOST') port = app.config.get('HUSKAR_API_PORT') if not all([self.token, host, port, self.team]): raise HuskarException('Missing huskar connection infomation') self.client = http.HTTPConnection(host, port, timeout=self.timeout)
def init_app(self, app): host = app.config.get('SASH_HOST') port = app.config.get('SASH_PORT') if not all([host, port]): raise SashException('Missing sash connection infomation') self.client = http.HTTPConnection(host, port, timeout=self.timeout)
def _fetch_state(self, uid) -> str: connection = HTTPConnection('vorlesungsplan.dhbw-mannheim.de') connection.request( 'GET', '/ical.php?uid=%s'%(uid) ) response = connection.getresponse() response_status = response.getcode() if response_status != 200: raise RuntimeError('Server reported Status %s'%(response_status)) return response.read().decode('utf-8')
def _new_conn(self): """ Return a fresh :class:`httplib.HTTPConnection`. """ self.num_connections += 1 log.info("Starting new HTTP connection (%d): %s" % (self.num_connections, self.host)) return HTTPConnection(host=self.host, port=self.port, strict=self.strict)
def connectOpenAPIServer(getType): global conn, server if getType == 3: conn = HTTPConnection(daumServer) else: conn = HTTPConnection(server)