我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用grpc.ssl_channel_credentials()。
def create_grpc_channel(target, pem=None, opts=None): """Construct a grpc channel. Args: target: url of target include host:port pem: ssl/tls pem file as bytes opts: grpc channel options grpc.default_authority: default authority grpc.ssl_target_name_override: ssl target name override Returns: grpc channel """ if pem is None: return grpc.insecure_channel(target, opts) else: creds = grpc.ssl_channel_credentials(pem) return grpc.secure_channel(target, creds, opts)
def _get_secure_creds(self, ca_cert, cert_key=None, cert_cert=None): cert_key_file = None cert_cert_file = None with open(ca_cert, 'rb') as f: ca_cert_file = f.read() if cert_key is not None: with open(cert_key, 'rb') as f: cert_key_file = f.read() if cert_cert is not None: with open(cert_cert, 'rb') as f: cert_cert_file = f.read() return grpc.ssl_channel_credentials( ca_cert_file, cert_key_file, cert_cert_file )
def get_channel(self): if self.channel is None: device = self.adapter_agent.get_device(self.device_id) # read in certificate try: with open('/voltha/pki/voltha-CA.pem') as f: trusted_certs = f.read() with open('/voltha/pki/voltha.crt') as f: client_cert = f.read() with open('/voltha/pki/voltha.key') as f: client_key = f.read() except Exception as e: log.error('failed-to-read-cert-keys', reason=e) # create credentials credentials = grpc.ssl_channel_credentials( root_certificates=trusted_certs, private_key=client_key, certificate_chain=client_cert) # create channel using ssl credentials my_server_host_override_string = "ABCD" # Server's CN Name, Ugly but no other Choice. self.channel = grpc.secure_channel(device.host_and_port, credentials, options=(('grpc.ssl_target_name_override', my_server_host_override_string,),)) return self.channel
def create_grpc_channel(target, credentials, ssl_credentials_file=None, grpc_channel_options=[]): """Create and return a gRPC channel. Args: credentials(google.oauth2.credentials.Credentials): OAuth2 credentials. ssl_credentials_file(str): Path to SSL credentials.pem file (for testing). grpc_channel_options([(option_name, option_val)]): gRPC channel options. Returns: grpc.Channel. """ ssl_credentials = None if ssl_credentials_file: with open(ssl_credentials_file) as f: ssl_credentials = grpc.ssl_channel_credentials(f.read()) http_request = google.auth.transport.requests.Request() # TODO(proppy): figure out if/why we need to force a refresh. # if yes, consider remove access token from serialized credentials. credentials.refresh(http_request) return google.auth.transport.grpc.secure_authorized_channel( credentials, http_request, target, ssl_credentials=ssl_credentials, options=grpc_channel_options)
def __init__(self, rpc_port): self.port = rpc_port cred = grpc.ssl_channel_credentials(open('tls.cert').read()) channel = grpc.secure_channel('localhost:{}'.format(rpc_port), cred) self.stub = lnrpc_grpc.LightningStub(channel)
def getGRPCChannel(ipAddress, port, root_certificates, ssl_target_name_override): # channel = grpc.insecure_channel("{0}:{1}".format(ipAddress, 7051), options = [('grpc.max_message_length', 100*1024*1024)]) # creds = grpc.ssl_channel_credentials(root_certificates=root_certificates, private_key=private_key, certificate_chain=certificate_chain) creds = grpc.ssl_channel_credentials(root_certificates=root_certificates) channel = grpc.secure_channel("{0}:{1}".format(ipAddress, port), creds, options=(('grpc.ssl_target_name_override', ssl_target_name_override,),('grpc.default_authority', ssl_target_name_override,),('grpc.max_receive_message_length', 100*1024*1024))) # print("Returning GRPC for address: {0}".format(ipAddress)) return channel
def run(): creds = grpc.ssl_channel_credentials(root_certificates=open('/certs/cert.pem', 'rb').read()) channel = grpc.secure_channel('lb:50051', creds) stub = helloworld_pb2_grpc.GreeterStub(channel) while True: try: response = stub.SayHello(helloworld_pb2.HelloRequest(name='you')) logger.error("Greeter client received: " + response.message) time.sleep(3) except Exception as e: logger.error('Could not connect load-balancer. error {}'.format(e)) time.sleep(3)
def run(): # read in certificate with open('server.crt') as f: trusted_certs = f.read().encode() # create credentials credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs) channel = grpc.secure_channel('localhost:50051', credentials) try: grpc.channel_ready_future(channel).result(timeout=10) except grpc.FutureTimeoutError: sys.exit('Error connecting to server') else: stub = users_service.UsersStub(channel) metadata = [('ip', '127.0.0.1')] try: response = stub.CreateUser( users_messages.CreateUserRequest(username='tom'), metadata=metadata, ) except grpc.RpcError as e: print('CreateUser failed with {0}: {1}'.format(e.code(), e.details())) else: print("User created:", response.user.username) request = users_messages.GetUsersRequest( user=[users_messages.User(username="alexa", user_id=1), users_messages.User(username="christie", user_id=1)] ) response = stub.GetUsers(request) for resp in response: print(resp)
def secure_authorized_channel( credentials, request, target, ssl_credentials=None, **kwargs): """Creates a secure authorized gRPC channel. This creates a channel with SSL and :class:`AuthMetadataPlugin`. This channel can be used to create a stub that can make authorized requests. Example:: import google.auth import google.auth.transport.grpc import google.auth.transport.requests from google.cloud.speech.v1 import cloud_speech_pb2 # Get credentials. credentials, _ = google.auth.default() # Get an HTTP request function to refresh credentials. request = google.auth.transport.requests.Request() # Create a channel. channel = google.auth.transport.grpc.secure_authorized_channel( credentials, 'speech.googleapis.com:443', request) # Use the channel to create a stub. cloud_speech.create_Speech_stub(channel) Args: credentials (google.auth.credentials.Credentials): The credentials to add to requests. request (google.auth.transport.Request): A HTTP transport request object used to refresh credentials as needed. Even though gRPC is a separate transport, there's no way to refresh the credentials without using a standard http transport. target (str): The host and port of the service. ssl_credentials (grpc.ChannelCredentials): Optional SSL channel credentials. This can be used to specify different certificates. kwargs: Additional arguments to pass to :func:`grpc.secure_channel`. Returns: grpc.Channel: The created gRPC channel. """ # Create the metadata plugin for inserting the authorization header. metadata_plugin = AuthMetadataPlugin(credentials, request) # Create a set of grpc.CallCredentials using the metadata plugin. google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin) if ssl_credentials is None: ssl_credentials = grpc.ssl_channel_credentials() # Combine the ssl credentials and the authorization credentials. composite_credentials = grpc.composite_channel_credentials( ssl_credentials, google_auth_credentials) return grpc.secure_channel(target, composite_credentials, **kwargs)