我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用ssl.get_server_certificate()。
def inspect(host, port): try: r = requests.get('https://%s:%s' % (host, port), timeout=2) except SSLError as e: errmsg = str(e) # CHECK ERR_HOST_NOT_MATCH if errmsg.startswith('hostname'): return ERR_HOST_NOT_MATCH try: raw_cert = ssl.get_server_certificate((host, port)) except: return ERR_UNKNOWN x509 = crypto.load_certificate(crypto.FILETYPE_PEM, raw_cert) # CHECK ERR_EXPIRED now = datetime.now() not_after = datetime.strptime(x509.get_notAfter().decode('utf-8'), "%Y%m%d%H%M%SZ") not_before = datetime.strptime(x509.get_notBefore().decode('utf-8'), "%Y%m%d%H%M%SZ") if now > not_after or now < not_before: return ERR_EXPIRED # otherwise ERR_SELF_SIGNED return ERR_SELF_SIGNED except ConnectionError as e: return ERR_TIMEOUT except Timeout as e: return ERR_TIMEOUT except: return ERR_UNKNOWN return ERR_NONE
def check(): hostname = "your_server.com" port = 443 num_days = 7 cert = ssl.get_server_certificate( (hostname, port), ssl_version=ssl.PROTOCOL_TLSv1) x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) expiry_date = x509.get_notAfter() print(str(expiry_date)) assert expiry_date, "Cert doesn't have an expiry date." ssl_date_fmt = r'%Y%m%d%H%M%SZ' expires = datetime.datetime.strptime(str(expiry_date)[2:-1], ssl_date_fmt) remaining = (expires - datetime.datetime.utcnow()).days if remaining <= num_days: print("ALERTING!") else: print("Not alerting. You have " + str(remaining) + " days")
def get_ssl_certificate(self, ssl_version=None): """ Get an OpenSSL cryptographic PEM certificate from the endpoint using the specified SSL version. :param ssl_version: The SSL version to connect with. If None, then let OpenSSL negotiate which protocol to use. :return: A tuple containing (1) the certificate string and (2) an OpenSSL cryptographic PEM certificate from the endpoint. """ try: if ssl_version is not None: cert = ssl.get_server_certificate((self.address, self.port), ssl_version=getattr(ssl, ssl_version)) else: cert = ssl.get_server_certificate((self.address, self.port)) return cert, OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) except ssl.SSLError as e: raise SslCertificateRetrievalFailedError( message="SSL error thrown when retrieving SSL certificate: %s." % (e,) ) # Protected Methods # Private Methods
def _certificate_required(cls, hostname, port=XCLI_DEFAULT_PORT, ca_certs=None, validate=None): ''' returns true if connection should verify certificate ''' if not ca_certs: return False xlog.debug("CONNECT SSL %s:%s, cert_file=%s", hostname, port, ca_certs) certificate = ssl.get_server_certificate((hostname, port), ca_certs=None) # handle XIV pre-defined certifications # if a validation function was given - we let the user check # the certificate himself, with the user's own validate function. # if the validate returned True - the user checked the cert # and we don't need check it, so we return false. if validate: return not validate(certificate) return True
def _get_cert_from_endpoint(server, port=443): try: cert = ssl.get_server_certificate((server, port)) except Exception: log.error('Unable to retrieve certificate from {0}'.format(server)) cert = None if not cert: return None return cert
def _fetch_and_store_cert(self, server, port, path): """Grabs a certificate from a server and writes it to a given path.""" try: cert = ssl.get_server_certificate((server, port)) except Exception as e: raise cfg.Error(_('Could not retrieve initial ' 'certificate from controller %(server)s. ' 'Error details: %(error)s') % {'server': server, 'error': str(e)}) LOG.warning(_LW("Storing to certificate for host %(server)s " "at %(path)s") % {'server': server, 'path': path}) self._file_put_contents(path, cert) return cert
def save_certificate(self): port = self.url.port if self.url.port else 443 adr = (self.url.hostname, port) if self.debug: print("save_certificate of %s to %s" % (str(adr), self.cafile)) capath = os.path.dirname(self.cafile) if not os.path.exists(capath): os.makedirs(capath) cert = ssl.get_server_certificate(adr) with open(self.cafile, "w") as outfile: outfile.write(cert)
def get_vc_fingerprint(vcenter_ip): cert_pem = ssl.get_server_certificate((vcenter_ip, VCENTER_PORT), ssl_version=ssl.PROTOCOL_TLSv1) x509 = X509.load_cert_string(cert_pem, X509.FORMAT_PEM) fp = x509.get_fingerprint('sha1') return ':'.join(a+b for a, b in zip(fp[::2], fp[1::2]))
def process(self, profile, state, vertex): if 'type' not in vertex: return { 'error' : "No vertex type defined!" } properties = dict() if vertex['type'] == "domain": try: begin = time.time() properties['content'] = ssl.get_server_certificate((vertex['id'], self.ssl_port)) properties['time'] = time.time() - begin except Exception as e: properties['error'] = str(e) return { "properties": properties, "neighbors" : [] }
def check_certificate(self, domain): """ Download and get information from the TLS certificate """ pem = ssl.get_server_certificate((domain, 443)) if self.output: with open(os.path.join(self.output, 'cert.pem'), 'wb') as f: f.write(pem) cert = x509.load_pem_x509_certificate(str(pem), default_backend()) self.log.critical("\tCertificate:") self.log.critical("\t\tDomain: %s", ",".join(map(lambda x: x.value, cert.subject))) self.log.critical("\t\tNot After: %s", str(cert.not_valid_after)) self.log.critical("\t\tNot Before: %s", str(cert.not_valid_before)) self.log.critical("\t\tCA Issuer: %s", ", ".join(map(lambda x:x.value, cert.issuer))) self.log.critical("\t\tSerial: %s", cert.serial_number) for ext in cert.extensions: if ext.oid._name == 'basicConstraints': if ext.value.ca: self.log.critical("\t\tBasic Constraints: True") elif ext.oid._name == 'subjectAltName': self.log.critical("\t\tAlternate names: %s", ", ".join(ext.value.get_values_for_type(x509.DNSName)))
def tls(ctx, domain_name): """Get the TLS certificate for a domain. Example: $ lokey fetch tls gliderlabs.com """ try: cert = stdlib_ssl.get_server_certificate((domain_name, 443)) click.echo(cert) except: msg =("Unable to fetch key from {}, " "is that domain configured for TLS?").format(domain_name) raise click.ClickException(msg)
def get_server_certificate(*args, **kwargs): return await run_in_thread(partial(_ssl.get_server_certificate, *args, **kwargs)) # Small wrapper class to make sure the wrap_socket() method returns the right type
def onHtml(self, event): fila = self.listadoVM for i in range(len(fila)): if logger != None: logger.info(fila[i]) # El tercer elemento es la ip y el 9 es el UUID vm = conexion.searchIndex.FindByUuid(None,fila[8], True) vm_name = vm.summary.config.name vm_moid = vm._moId if logger != None: logger.info('void= '.format(vm_moid)) vcenter_data = conexion.setting vcenter_settings = vcenter_data.setting console_port = '9443' puerto_vcenter= '443' for item in vcenter_settings: key = getattr(item, 'key') #print ('key: ' + key + ' =>'+ str(getattr(item, 'value'))) if key == 'VirtualCenter.FQDN': vcenter_fqdn = getattr(item, 'value') #if key == 'WebService.Ports.https': #console_port = str(getattr(item, 'value')) host = vcenter_fqdn session_manager = conexion.sessionManager session = session_manager.AcquireCloneTicket() vc_cert = ssl.get_server_certificate((host, int(puerto_vcenter))) vc_pem = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,vc_cert) vc_fingerprint = vc_pem.digest('sha1') if logger != None: logger.info("Open the following URL in your browser to access the " \ "Remote Console.\n" \ "You have 60 seconds to open the URL, or the session" \ "will be terminated.\n") print(str(vcenter_data)) # Locate the version of vcenter the object .version for locate the version of vcenter object_about = conexion.about #For version vcenter 5.5 if object_about.version == '5.5.0': console_portv5 = '7331' URL5 = "http://" + host + ":" + console_portv5 + "/console/?vmId=" \ + str(vm_moid) + "&vmName=" + vm_name + "&host=" + vcenter_fqdn \ + "&sessionTicket=" + session + "&thumbprint=" + vc_fingerprint.decode('utf8') webbrowser.open(URL5, new=1, autoraise=True) #For version vcenter 6.0 and 6.5 if object_about.version == '6.0.0' or object_about.version == '6.5.0': URL = "https://" + host + ":" + console_port + "/vsphere-client/webconsole.html?vmId=" \ + str(vm_moid) + "&vmName=" + vm_name + "&host=" + vcenter_fqdn \ + "&sessionTicket=" + session + "&thumbprint.info=" + vc_fingerprint.decode('utf-8') if logger != None: logger.info(URL) webbrowser.open(URL, new=1, autoraise=True) if logger != None: logger.info ("Waiting for 60 seconds, then exit")