我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用http.client()。
def __init__(self, host, port=None, key_file=None, cert_file=None, timeout=None, proxy_info=None, ca_certs=None, disable_ssl_certificate_validation=False): # TODO: implement proxy_info self.proxy_info = proxy_info context = None if ca_certs is None: ca_certs = CA_CERTS if (cert_file or ca_certs): if not hasattr(ssl, 'SSLContext'): raise CertificateValidationUnsupportedInPython31() context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) if disable_ssl_certificate_validation: context.verify_mode = ssl.CERT_NONE else: context.verify_mode = ssl.CERT_REQUIRED if cert_file: context.load_cert_chain(cert_file, key_file) if ca_certs: context.load_verify_locations(ca_certs) http.client.HTTPSConnection.__init__( self, host, port=port, key_file=key_file, cert_file=cert_file, timeout=timeout, context=context, check_hostname=disable_ssl_certificate_validation ^ True)
def __init__(self, info): # info is either an email.message or # an httplib.HTTPResponse object. if isinstance(info, http.client.HTTPResponse): for key, value in info.getheaders(): key = key.lower() prev = self.get(key) if prev is not None: value = ', '.join((prev, value)) self[key] = value self.status = info.status self['status'] = str(self.status) self.reason = info.reason self.version = info.version elif isinstance(info, email.message.Message): for key, value in list(info.items()): self[key.lower()] = value self.status = int(self['status']) else: for key, value in info.items(): self[key.lower()] = value self.status = int(self.get('status', self.status))
def debug(self, value): """ Sets the debug status. :param value: The debug status, True or False. :type: bool """ self.__debug = value if self.__debug: # if debug status is True, turn on debug logging for _, logger in iteritems(self.logger): logger.setLevel(logging.DEBUG) # turn on httplib debug httplib.HTTPConnection.debuglevel = 1 else: # if debug status is False, turn off debug logging, # setting log level to default `logging.WARNING` for _, logger in iteritems(self.logger): logger.setLevel(logging.WARNING) # turn off httplib debug httplib.HTTPConnection.debuglevel = 0
def execute(self, context): scene = context.scene netsettings = scene.network_render try: conn = clientConnection(netsettings, report = self.report) if conn: # Sending file client.sendJobBaking(conn, scene) conn.close() self.report({'INFO'}, "Job sent to master") except Exception as err: self.report({'ERROR'}, str(err)) return {'FINISHED'}
def execute(self, context): scene = context.scene netsettings = scene.network_render try: conn = clientConnection(netsettings, report = self.report) if conn: # Sending file scene.network_render.job_id = client.sendJob(conn, scene, True) conn.close() self.report({'INFO'}, "Job sent to master") except Exception as err: self.report({'ERROR'}, str(err)) return {'FINISHED'}
def execute(self, context): scene = context.scene netsettings = scene.network_render try: conn = clientConnection(netsettings, report = self.report) if conn: # Sending file scene.network_render.job_id = client.sendJob(conn, scene, False) conn.close() self.report({'INFO'}, "Job sent to master") except Exception as err: self.report({'ERROR'}, str(err)) return {'FINISHED'}
def clientVerifyVersion(conn, timeout): with ConnectionContext(timeout): conn.request("GET", "/version") response = conn.getresponse() if response.status != http.client.OK: conn.close() return False server_version = response.read() if server_version != VERSION: print("Incorrect server version!") print("expected", str(VERSION, encoding='utf8'), "received", str(server_version, encoding='utf8')) return False return True
def _api_get(self, path): ctx = http.client.ssl._create_stdlib_context() conn = http.client.HTTPSConnection(self._api, timeout=20, context=ctx) log.info("calling {}/<**token**>{}".format(self._api, path)) tpath = '/' + self._api_token + path # Do not leak out token in production # log.info("calling {}{}".format(self._api, tpath)) try: conn.request('GET', tpath) response = conn.getresponse().read().decode('utf-8') except socket.timeout: log.info("socket timeout") raise APIError('testbed daemon socket timeout') j = self._parse(response) if j["status"] != "ok": raise APIError(response) return j
def fetch_and_mangle_worker_autoyast(admin_host_ipaddr): """Fetch autoyast file and upload it to /srv/www/htdocs/autoyast/caasp/worker_mangled.xml """ assert admin_host_ipaddr log.info("Fetching autoyast file from {}".format(admin_host_ipaddr)) ctx = http.client.ssl._create_stdlib_context() conn = http.client.HTTPSConnection(admin_host_ipaddr, timeout=20, context=ctx) conn.request('GET', AUTOYAST_URL_PATH) ay = conn.getresponse().read().decode('utf-8') assert '<pattern>SUSE-CaaSP-Stack</pattern>' in ay, ay xml = ay.replace('</storage>\n', '</storage>\n' + AUTOYAST_SIG_CHUNK) # Inject SSH key into authorized keys to allow SSHing to workers xml = xml.replace( ' </chroot-scripts>\n', AUTOYAST_AUTHORIZED_KEYS_CHUNK + ' </chroot-scripts>\n' ) tsclient.upload_worker_mangled_xml(xml)
def install_prometheus_certs(kubeconfig): log.info("Fetching API client keys for Prometheus") y = yaml.load(kubeconfig) k = base64.b64decode(y['users'][0]['user']['client-key-data']) with open('client_key', 'w') as f: f.write(k.decode()) subprocess.check_call(["/usr/bin/sudo", "scp", "client_key", PROMETHEUS_SSH + ":/srv/prometheus/prometheus/kube_api_client_key"]) c = base64.b64decode(y['users'][0]['user']['client-certificate-data']) with open('client_cert', 'w') as f: f.write(c.decode()) subprocess.check_call(["/usr/bin/sudo", "scp", "client_cert", PROMETHEUS_SSH + ":/srv/prometheus/prometheus/kube_api_client_cert"]) log.info("Reloading Prometheus") subprocess.check_call(["/usr/bin/sudo", "ssh", PROMETHEUS_SSH, "/usr/bin/systemctl", "reload", "prometheus.service"])
def __init__(self, host, port=None, key_file=None, cert_file=None, timeout=None, proxy_info=None, ca_certs=None, disable_ssl_certificate_validation=False): self.proxy_info = proxy_info context = None if ca_certs is None: ca_certs = CA_CERTS if (cert_file or ca_certs) and not disable_ssl_certificate_validation: if not hasattr(ssl, 'SSLContext'): raise CertificateValidationUnsupportedInPython31() context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED if cert_file: context.load_cert_chain(cert_file, key_file) if ca_certs: context.load_verify_locations(ca_certs) http.client.HTTPSConnection.__init__( self, host, port=port, key_file=key_file, cert_file=cert_file, timeout=timeout, context=context, check_hostname=True)
def handle_expect_100(self): """Decide what to do with an "Expect: 100-continue" header. If the client is expecting a 100 Continue response, we must respond with either a 100 Continue or a final response before waiting for the request body. The default is to always respond with a 100 Continue. You can behave differently (for example, reject unauthorized requests) by overriding this method. This method should either return True (possibly after sending a 100 Continue response) or send an error response and return False. """ self.send_response_only(100) return True
def log_message(self, format, *args): """Log an arbitrary message. This is used by all other logging functions. Override it if you have specific logging wishes. The first argument, FORMAT, is a format string for the message to be logged. If the format string contains any % escapes requiring parameters, they should be specified as subsequent arguments (it's just like printf!). The client host and current date/time are prefixed to every message. """ sys.stderr.write("%s - - [%s] %s\n" % (self.address_string(), self.log_date_time_string(), format%args))
def urloption(url, headers={}, retries=RETRIES): '''??OPTION ??''' headers_merged = default_headers.copy() for key in headers.keys(): headers_merged[key] = headers[key] schema = urllib.parse.urlparse(url) for i in range(retries): try: conn = http.client.HTTPConnection(schema.netloc) conn.request('OPTIONS', url, headers=headers_merged) resp = conn.getresponse() return resp except OSError: logger.error(traceback.format_exc()) #return None except: logger.error(traceback.format_exc()) #return None return None
def urlopen_without_redirect(url, headers={}, data=None, retries=RETRIES): '''????URL, ?????Response??. ??????. ??????????URL???(Error 301/302)????, ?????URL?? ???????, ??Header????????. ''' headers_merged = default_headers.copy() for key in headers.keys(): headers_merged[key] = headers[key] parse_result = urllib.parse.urlparse(url) for i in range(retries): try: conn = http.client.HTTPConnection(parse_result.netloc) if data: conn.request('POST', url, body=data, headers=headers_merged) else: conn.request('GET', url, body=data, headers=headers_merged) return conn.getresponse() except OSError: logger.error(traceback.format_exc()) except: logger.error(traceback.format_exc()) #return None return None
def create_time_annotation(self, service_name, ipv4, port, timestamp, value): """ Creates an annotation thrift object :param service_name: name that identifies the service of the endpoint :param ipv4: ipv4 address of the endpoint :param port: port of the service :param timestamp: timestamp of when the annotation occured in microseconds :param value: Zipkin value for the annotation, like CS (client start), SS (server send) or SR (server receive) :returns: zipkin annotation object """ return ({ "endpoint": { "serviceName": service_name, "ipv4": ipv4, "port": port }, "timestamp": timestamp, "value": value })
def _make_http_response(self, endpoint, request): content = request.get_content() method = request.get_method() header = request.get_signed_header(self.get_region_id(), self.get_access_key(), self.get_access_secret()) if self.get_user_agent() is not None: header['User-Agent'] = self.get_user_agent() header['x-sdk-client'] = 'python/2.0.0' if header is None: header = {} protocol = request.get_protocol_type() url = request.get_url(self.get_region_id(), self.get_access_key(), self.get_access_secret()) response = HttpResponse(endpoint, url, method, header, protocol, content, self._port) return response
def do_action_with_exception(self, acs_request): # set server response format as json, because thie function will # parse the response so which format doesn't matter acs_request.set_accept_format('json') status, headers, body = self._implementation_of_do_action(acs_request) request_id = None ret = body try: body_obj = json.loads(body.decode('utf-8')) request_id = body_obj.get('RequestId') ret = body_obj except ValueError: # in case the response body is not a json string, return the raw data instead pass if status != http.client.OK: server_error_code, server_error_message = self._parse_error_info_from_response_body(body) raise ServerException(server_error_code, server_error_message, http_status=status, request_id=request_id) return body
def setup(self): # SSLify.. if self.server.sslkey: import ssl sslca = self.server.sslca keyfile = self.server.sslkey certfile = self.server.sslcrt sslreq = ssl.CERT_NONE # If they specify a CA key, require valid client certs if sslca: sslreq=ssl.CERT_REQUIRED self.request = ssl.wrap_socket(self.request, keyfile=keyfile, certfile=certfile, ca_certs=sslca, cert_reqs=sslreq, server_side=True) http.server.BaseHTTPRequestHandler.setup(self)
def handleLogin(self, objname, authinfo): if not self.server.getSharedObject(objname): raise Exception('Shared object does not exists!') if self.server.authmod: if not authinfo or not self.server.authmod.authCobraUser(authinfo): self.send_response(http.client.UNAUTHORIZED) self.end_headers() else: sesskey = base64.b64encode( os.urandom(32) ) self.server.sessions[ sesskey ] = (authinfo, time.time()) c = http.cookies.SimpleCookie() c['SessionId'] = sesskey # set morsel self.send_response(http.client.OK) self.send_header('Set-Cookie', list(c.values())[0].output(header='')) self.send_header("Content-type", "text/html") self.end_headers() return self.send_response(http.client.OK) self.end_headers()
def handleGetAttr(self, objname, attr): if not self.server.attr: self.send_response(http.client.FORBIDDEN) self.end_headers() excinfo = "__getattr__ disabled" self.wfile.write(json.dumps(excinfo)) return if verbose: print("GETTING ATTRIBUTE:", attr) obj = self.server.getSharedObject(objname) try: val = getattr(obj, attr) self.send_response(http.client.OK) self.end_headers() self.wfile.write(json.dumps(val)) except Exception as e: self.send_response(http.client.NOT_FOUND) self.end_headers() excinfo = "%s" % traceback.format_exc() self.wfile.write(json.dumps(excinfo))
def __init__(self, host, port=None, key_file=None, cert_file=None, timeout=None, proxy_info=None, ca_certs=None, disable_ssl_certificate_validation=False): self.proxy_info = proxy_info context = None if ca_certs is None: ca_certs = CA_CERTS if (cert_file or ca_certs) and not disable_ssl_certificate_validation: if not hasattr(ssl, 'SSLContext'): raise CertificateValidationUnsupportedInPython31() context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED if cert_file: context.load_cert_chain(cert_file, key_file) if ca_certs: context.load_verify_locations(ca_certs) http.client.HTTPSConnection.__init__( self, host, port=port, key_file=key_file, cert_file=cert_file, timeout=timeout, context=context, check_hostname=False)
def SetXTwitterHeaders(self, client, url, version): """Set the X-Twitter HTTP headers that will be sent to the server. Args: client: The client name as a string. Will be sent to the server as the 'X-Twitter-Client' header. url: The URL of the meta.xml as a string. Will be sent to the server as the 'X-Twitter-Client-URL' header. version: The client version as a string. Will be sent to the server as the 'X-Twitter-Client-Version' header. """ self._request_headers['X-Twitter-Client'] = client self._request_headers['X-Twitter-Client-URL'] = url self._request_headers['X-Twitter-Client-Version'] = version
def verify_request(self, sock, ca): # Do the sspi auth dance self.sa.reset() while 1: data = _get_msg(sock) if data is None: return False try: err, sec_buffer = self.sa.authorize(data) except sspi.error as details: print("FAILED to authorize client:", details) return False if err==0: break _send_msg(sock, sec_buffer[0].Buffer) return True
def process_request(self, request, client_address): # An example using the connection once it is established. print("The server is running as user", GetUserName()) self.sa.ctxt.ImpersonateSecurityContext() try: print("Having conversation with client as user", GetUserName()) while 1: # we need to grab 2 bits of data - the encrypted data, and the # 'key' data = _get_msg(request) key = _get_msg(request) if data is None or key is None: break data = self.sa.decrypt(data, key) print("Client sent:", repr(data)) finally: self.sa.ctxt.RevertSecurityContext() self.close_request(request) print("The server is back to user", GetUserName())
def sspi_client(): c = http.client.HTTPConnection("localhost", options.port) c.connect() # Do the auth dance. ca = sspi.ClientAuth(options.package, targetspn=options.target_spn) data = None while 1: err, out_buf = ca.authorize(data) _send_msg(c.sock, out_buf[0].Buffer) if err==0: break data = _get_msg(c.sock) print("Auth dance complete - sending a few encryted messages") # Assume out data is sensitive - encrypt the message. for data in "Hello from the client".split(): blob, key = ca.encrypt(data) _send_msg(c.sock, blob) _send_msg(c.sock, key) c.sock.close() print("Client completed.")
def handle_expect_100(self): """Decide what to do with an "Expect: 100-continue" header. If the client is expecting a 100 Continue response, we must respond with either a 100 Continue or a final response before waiting for the request body. The default is to always respond with a 100 Continue. You can behave differently (for example, reject unauthorized requests) by overriding this method. This method should either return True (possibly after sending a 100 Continue response) or send an error response and return False. """ self.send_response_only(100) self.end_headers() return True
def log_message(self, format, *args): """Log an arbitrary message. This is used by all other logging functions. Override it if you have specific logging wishes. The first argument, FORMAT, is a format string for the message to be logged. If the format string contains any % escapes requiring parameters, they should be specified as subsequent arguments (it's just like printf!). The client ip and current date/time are prefixed to every message. """ sys.stderr.write("%s - - [%s] %s\n" % (self.address_string(), self.log_date_time_string(), format%args))
def getproxies_environment(): """Return a dictionary of scheme -> proxy server URL mappings. Scan the environment for variables named <scheme>_proxy; this seems to be the standard convention. If you need a different way, you can pass a proxies dictionary to the [Fancy]URLopener constructor. """ proxies = {} for name, value in os.environ.items(): name = name.lower() if value and name[-6:] == '_proxy': proxies[name[:-6]] = value # CVE-2016-1000110 - If we are running as CGI script, forget HTTP_PROXY # (non-all-lowercase) as it may be set from the web server by a "Proxy:" # header from the client if 'REQUEST_METHOD' in os.environ: proxies.pop('http', None) return proxies
def __init__(self, host, port, user, password, database, settings, format, format_stdin, multiline, stacktrace): self.config = None self.host = host self.port = port self.user = user self.password = password self.database = database self.settings = {k: v[0] for k, v in parse_qs(settings).items()} self.format = format self.format_stdin = format_stdin self.multiline = multiline self.stacktrace = stacktrace self.server_version = None self.query_ids = [] self.client = None self.echo = Echo(verbose=True, colors=True) self.progress = None self.metadata = {}
def test_get_a_run(self): url = reverse('testruns-get', args=[self.test_run.pk]) response = self.client.get(url) self.assertEqual(http.client.OK, response.status_code) for i, case_run in enumerate( (self.case_run_1, self.case_run_2, self.case_run_3), 1): self.assertContains( response, '<a href="#caserun_{0}">#{0}</a>'.format(case_run.pk), html=True) self.assertContains( response, '<a id="link_{0}" href="#caserun_{1}" title="Expand test case">' '{2}</a>'.format(i, case_run.pk, case_run.case.summary), html=True)
def test_show_create_new_run_page(self): self.client.login(username=self.tester.username, password='password') response = self.client.post(self.url, { 'from_plan': self.plan.pk, 'case': [self.case_1.pk, self.case_2.pk, self.case_3.pk] }) # Assert listed cases for i, case in enumerate((self.case_1, self.case_2, self.case_3), 1): self.assertContains( response, '<a href="/case/{0}/">{0}</a>'.format(case.pk), html=True) self.assertContains( response, '<a id="link_{0}" class="blind_title_link js-case-summary" ' 'data-param="{0}">{1}</a>'.format(i, case.summary), html=True)
def test_add_env_value_to_runs(self): self.login_tester() self.client.get(self.env_value_url, { 'a': 'add', 'env_value_id': self.value_bsd.pk, 'run_id': [self.test_run.pk, self.test_run_1.pk] }) rel = TCMSEnvRunValueMap.objects.filter(run=self.test_run, value=self.value_bsd) self.assertTrue(rel.exists()) rel = TCMSEnvRunValueMap.objects.filter(run=self.test_run_1, value=self.value_bsd) self.assertTrue(rel.exists())
def test_delete_env_value_from_runs(self): self.login_tester() self.client.get(self.env_value_url, { 'a': 'remove', 'env_value_id': self.value_linux.pk, 'run_id': [self.test_run.pk, self.test_run_1.pk], }) rel = TCMSEnvRunValueMap.objects.filter(run=self.test_run, value=self.value_linux) self.assertFalse(rel.exists()) rel = TCMSEnvRunValueMap.objects.filter(run=self.test_run_1, value=self.value_linux) self.assertFalse(rel.exists())
def test_refuse_if_action_is_unknown(self): self.login_tester() post_data = { 'a': 'unknown action', 'case_run': self.case_run_1.pk, 'case': self.case_run_1.case.pk, 'bug_system_id': TestCaseBugSystem.objects.get(name='Bugzilla').pk, 'bug_id': '123456', } response = self.client.get(self.case_run_bug_url, post_data) self.assertJsonResponse( response, {'rc': 1, 'response': 'Unrecognizable actions'})
def test_remove_bug_from_case_run(self): self.login_tester() post_data = { 'a': 'remove', 'case_run': self.case_run_1.pk, 'bug_id': self.bug_12345, } response = self.client.get(self.case_run_bug_url, post_data) bug_exists = TestCaseBug.objects.filter( bug_id=self.bug_12345, case=self.case_run_1.case, case_run=self.case_run_1).exists() self.assertFalse(bug_exists) self.assertJsonResponse( response, {'rc': 0, 'response': 'ok', 'run_bug_count': 1})