我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyasn1.codec.der.decoder.decode()。
def _load_pkcs1_der(cls, keyfile): r'''Loads a key in PKCS#1 DER format. @param keyfile: contents of a DER-encoded file that contains the public key. @return: a PublicKey object First let's construct a DER encoded key: >>> import base64 >>> b64der = 'MAwCBQCNGmYtAgMBAAE=' >>> der = base64.decodestring(b64der) This loads the file: >>> PublicKey._load_pkcs1_der(der) PublicKey(2367317549, 65537) ''' from pyasn1.codec.der import decoder from rsa.asn1 import AsnPubKey (priv, _) = decoder.decode(keyfile, asn1Spec=AsnPubKey()) return cls(n=int(priv['modulus']), e=int(priv['publicExponent']))
def load_pkcs1_openssl_der(cls, keyfile): '''Loads a PKCS#1 DER-encoded public key file from OpenSSL. @param keyfile: contents of a DER-encoded file that contains the public key, from OpenSSL. @return: a PublicKey object ''' from rsa.asn1 import OpenSSLPubKey from pyasn1.codec.der import decoder from pyasn1.type import univ (keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey()) if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'): raise TypeError("This is not a DER-encoded OpenSSL-compatible public key") return cls._load_pkcs1_der(keyinfo['key'][1:])
def _key_identifier_from_public_key(public_key): # This is a very slow way to do this. serialized = public_key.public_bytes( serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo ) spki, remaining = decoder.decode( serialized, asn1Spec=_SubjectPublicKeyInfo() ) assert not remaining # the univ.BitString object is a tuple of bits. We need bytes and # pyasn1 really doesn't want to give them to us. To get it we'll # build an integer and convert that to bytes. bits = 0 for bit in spki.getComponentByName("subjectPublicKey"): bits = bits << 1 | bit data = utils.int_to_bytes(bits) return hashlib.sha1(data).digest()
def _load_pkcs1_der(cls, keyfile): """Loads a key in PKCS#1 DER format. :param keyfile: contents of a DER-encoded file that contains the public key. :return: a PublicKey object First let's construct a DER encoded key: >>> import base64 >>> b64der = 'MAwCBQCNGmYtAgMBAAE=' >>> der = base64.standard_b64decode(b64der) This loads the file: >>> PublicKey._load_pkcs1_der(der) PublicKey(2367317549, 65537) """ from pyasn1.codec.der import decoder from rsa.asn1 import AsnPubKey (priv, _) = decoder.decode(keyfile, asn1Spec=AsnPubKey()) return cls(n=int(priv['modulus']), e=int(priv['publicExponent']))
def load_pkcs1_openssl_der(cls, keyfile): """Loads a PKCS#1 DER-encoded public key file from OpenSSL. :param keyfile: contents of a DER-encoded file that contains the public key, from OpenSSL. :return: a PublicKey object """ from rsa.asn1 import OpenSSLPubKey from pyasn1.codec.der import decoder from pyasn1.type import univ (keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey()) if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'): raise TypeError("This is not a DER-encoded OpenSSL-compatible public key") return cls._load_pkcs1_der(keyinfo['key'][1:])
def getCNfromSSLSock(self, sslSock): derCert = sslSock.getpeercert(True) self.log.debug("CN Extraction: Doing Stuff") m2crypt_cert = M2Crypto.X509.load_cert_der_string(derCert) #print m2crypt_cert.as_text() print m2crypt_cert.get_pubkey() buff = decoder.decode(derCert,asn1Spec=certType)[0].\ getComponentByName('tbsCertificate').\ getComponentByName('subject').\ getComponentByType(RDNSequence().getTagSet()) #print buff.__class__.__name__ #print buff.prettyPrint() for item in buff: if item[0].getComponentByName('type') == (2, 5, 4, 3): cn = item[0].getComponentByName('value') break for item in cn: if item != None: return str(item)
def decodeChallengeSecurityBlob(data): try: d, _ = decoder.decode(data, asn1Spec = NegotiationToken()) nt = d.getComponentByName('negTokenTarg') token = nt.getComponentByName('responseToken') if not token: raise BadSecurityBlobError('NTLMSSP_CHALLENGE security blob does not contain responseToken field') provider_oid = nt.getComponentByName('supportedMech') if provider_oid and str(provider_oid) != '1.3.6.1.4.1.311.2.2.10': # This OID is defined in [MS-NLMP]: 1.9 raise UnsupportedSecurityProvider('Security provider "%s" is not supported by pysmb' % str(provider_oid)) result = nt.getComponentByName('negResult') return int(result), str(token) except Exception, ex: raise BadSecurityBlobError(str(ex))
def parseOcspResponse(ocspResponse): responseStatus = ocspResponse.getComponentByName('responseStatus') assert responseStatus == rfc2560.OCSPResponseStatus('successful'), responseStatus.prettyPrint() responseBytes = ocspResponse.getComponentByName('responseBytes') responseType = responseBytes.getComponentByName('responseType') assert responseType == rfc2560.id_pkix_ocsp_basic, responseType.prettyPrint() response = responseBytes.getComponentByName('response') basicOCSPResponse, _ = decoder.decode( response, asn1Spec=rfc2560.BasicOCSPResponse() ) tbsResponseData = basicOCSPResponse.getComponentByName('tbsResponseData') response0 = tbsResponseData.getComponentByName('responses').getComponentByPosition(0) return ( tbsResponseData.getComponentByName('producedAt'), response0.getComponentByName('certID'), response0.getComponentByName('certStatus').getName(), response0.getComponentByName('thisUpdate') )
def decode(data: bytes) -> KkdcpRequest: """Decode a KDC-PROXY-MESSAGE""" try: req, err = decoder.decode(data, asn1Spec=model.KdcProxyMessage()) except error.PyAsn1Error: raise ParserError("Invalid request") if err: raise ParserError("Invalid request") message = req.getComponentByName('kerb-message').asOctets() domain = req.getComponentByName('target-domain').asOctets() # TODO: Check if the request is valid here return KkdcpRequest(message, domain)
def get_subj_alt_name(peer_cert): # Search through extensions dns_name = [] if not SUBJ_ALT_NAME_SUPPORT: return dns_name general_names = SubjectAltName() for i in range(peer_cert.get_extension_count()): ext = peer_cert.get_extension(i) ext_name = ext.get_short_name() if ext_name != b'subjectAltName': continue # PyOpenSSL returns extension data in ASN.1 encoded form ext_dat = ext.get_data() decoded_dat = der_decoder.decode(ext_dat, asn1Spec=general_names) for name in decoded_dat: if not isinstance(name, SubjectAltName): continue for entry in range(len(name)): component = name.getComponentByPosition(entry) if component.getName() != 'dNSName': continue dns_name.append(str(component.getComponent())) return dns_name
def from_string(cls, key_pem, is_x509_cert): """Construct an RsaVerifier instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: RsaVerifier instance. Raises: ValueError: if the key_pem can't be parsed. In either case, error will begin with 'No PEM start marker'. If ``is_x509_cert`` is True, will fail to find the "-----BEGIN CERTIFICATE-----" error, otherwise fails to find "-----BEGIN RSA PUBLIC KEY-----". """ key_pem = _to_bytes(key_pem) if is_x509_cert: der = rsa.pem.load_pem(key_pem, 'CERTIFICATE') asn1_cert, remaining = decoder.decode(der, asn1Spec=Certificate()) if remaining != b'': raise ValueError('Unused bytes', remaining) cert_info = asn1_cert['tbsCertificate']['subjectPublicKeyInfo'] key_bytes = _bit_list_to_bytes(cert_info['subjectPublicKey']) pubkey = rsa.PublicKey.load_pkcs1(key_bytes, 'DER') else: pubkey = rsa.PublicKey.load_pkcs1(key_pem, 'PEM') return cls(pubkey)
def from_string(cls, key, password='notasecret'): """Construct an RsaSigner instance from a string. Args: key: string, private key in PEM format. password: string, password for private key file. Unused for PEM files. Returns: RsaSigner instance. Raises: ValueError if the key cannot be parsed as PKCS#1 or PKCS#8 in PEM format. """ key = _from_bytes(key) # pem expects str in Py3 marker_id, key_bytes = pem.readPemBlocksFromFile( six.StringIO(key), _PKCS1_MARKER, _PKCS8_MARKER) if marker_id == 0: pkey = rsa.key.PrivateKey.load_pkcs1(key_bytes, format='DER') elif marker_id == 1: key_info, remaining = decoder.decode( key_bytes, asn1Spec=_PKCS8_SPEC) if remaining != b'': raise ValueError('Unused bytes', remaining) pkey_info = key_info.getComponentByName('privateKey') pkey = rsa.key.PrivateKey.load_pkcs1(pkey_info.asOctets(), format='DER') else: raise ValueError('No key could be detected.') return cls(pkey)
def get_subj_alt_name(peer_cert): # Search through extensions dns_name = [] if not SUBJ_ALT_NAME_SUPPORT: return dns_name general_names = SubjectAltName() for i in range(peer_cert.get_extension_count()): ext = peer_cert.get_extension(i) ext_name = ext.get_short_name() if ext_name != 'subjectAltName': continue # PyOpenSSL returns extension data in ASN.1 encoded form ext_dat = ext.get_data() decoded_dat = der_decoder.decode(ext_dat, asn1Spec=general_names) for name in decoded_dat: if not isinstance(name, SubjectAltName): continue for entry in range(len(name)): component = name.getComponentByPosition(entry) if component.getName() != 'dNSName': continue dns_name.append(str(component.getComponent())) return dns_name
def _decrypt_rep(data, key, spec, enc_spec, msg_type): rep = decode(data, asn1Spec=spec)[0] rep_enc = str(rep['enc-part']['cipher']) #print rep_enc rep_enc = decrypt(key[0], key[1], msg_type, rep_enc) # MAGIC if rep_enc[:20] == '31337313373133731337': return rep_enc[20:22], None rep_enc = decode(rep_enc, asn1Spec=enc_spec)[0] return rep, rep_enc
def _extract_data(data, spec): rep = decode(data, asn1Spec=spec)[0] return rep #used in implicit authentication
def decrypt_ticket_enc_part(ticket, key): ticket_enc = str(ticket['enc-part']['cipher']) ticket_enc = decrypt(key[0], key[1], 2, ticket_enc) return decode(ticket_enc, asn1Spec=EncTicketPart())[0]
def iter_authorization_data(ad): if ad is None: return for block in ad: yield block if block['ad-type'] == AD_IF_RELEVANT: for subblock in iter_authorization_data(decode(str(block['ad-data']), asn1Spec=AuthorizationData())[0]): yield subblock
def decode_dss_signature(signature): try: data, remaining = decoder.decode(signature, asn1Spec=_DSSSigValue()) except PyAsn1Error: raise ValueError("Invalid signature data. Unable to decode ASN.1") if remaining: raise ValueError( "The signature contains bytes after the end of the ASN.1 sequence." ) r = int(data.getComponentByName('r')) s = int(data.getComponentByName('s')) return (r, s)
def _get_subj_alt_name(cls, peer_cert): '''Extract subjectAltName DNS name settings from certificate extensions @param peer_cert: peer certificate in SSL connection. subjectAltName settings if any will be extracted from this @type peer_cert: OpenSSL.crypto.X509 ''' # Search through extensions dns_name = [] general_names = SubjectAltName() for i in range(peer_cert.get_extension_count()): ext = peer_cert.get_extension(i) ext_name = ext.get_short_name() if ext_name == cls.SUBJ_ALT_NAME_EXT_NAME: # PyOpenSSL returns extension data in ASN.1 encoded form ext_dat = ext.get_data() decoded_dat = der_decoder.decode(ext_dat, asn1Spec=general_names) for name in decoded_dat: if isinstance(name, SubjectAltName): for entry in range(len(name)): component = name.getComponentByPosition(entry) dns_name.append(str(component.getComponent())) return dns_name
def from_string(cls, key_pem, is_x509_cert): """Construct an RsaVerifier instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: RsaVerifier instance. Raises: ValueError: if the key_pem can't be parsed. In either case, error will begin with 'No PEM start marker'. If ``is_x509_cert`` is True, will fail to find the "-----BEGIN CERTIFICATE-----" error, otherwise fails to find "-----BEGIN RSA PUBLIC KEY-----". """ key_pem = _helpers._to_bytes(key_pem) if is_x509_cert: der = rsa.pem.load_pem(key_pem, 'CERTIFICATE') asn1_cert, remaining = decoder.decode(der, asn1Spec=Certificate()) if remaining != b'': raise ValueError('Unused bytes', remaining) cert_info = asn1_cert['tbsCertificate']['subjectPublicKeyInfo'] key_bytes = _bit_list_to_bytes(cert_info['subjectPublicKey']) pubkey = rsa.PublicKey.load_pkcs1(key_bytes, 'DER') else: pubkey = rsa.PublicKey.load_pkcs1(key_pem, 'PEM') return cls(pubkey)