我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyasn1.type.univ.ObjectIdentifier()。
def load_pkcs1_openssl_der(cls, keyfile): '''Loads a PKCS#1 DER-encoded public key file from OpenSSL. @param keyfile: contents of a DER-encoded file that contains the public key, from OpenSSL. @return: a PublicKey object ''' from rsa.asn1 import OpenSSLPubKey from pyasn1.codec.der import decoder from pyasn1.type import univ (keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey()) if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'): raise TypeError("This is not a DER-encoded OpenSSL-compatible public key") return cls._load_pkcs1_der(keyinfo['key'][1:])
def _snmpFormatValue(v): _type=type(v) if _type is NoSuchObject: res="?????? ???????? OID" elif sum([issubclass(_type,baseType) for baseType in [TimeTicks,Integer,Integer32,Gauge32,Counter32,Counter64,Unsigned32]]): res=int(v) elif sum([issubclass(_type,baseType) for baseType in [ObjectIdentity,ObjectIdentifier,IpAddress]]): res=str(v.prettyPrint()) elif sum([issubclass(_type,baseType) for baseType in [OctetString]]): pp=v.prettyPrint() s=str(v) # sometimes prettyPrint returns hex values instead of string because of trailing 00 byte # if we got not a string - try to remove trailing 00 # if we got a string after it - use this value if pp!=s: if len(v)>0: if v[len(v)-1]==0: v2=v[:len(v)-1] pp2=v2.prettyPrint() s2=str(v2) if pp2==s2: v=v2 res=v.prettyPrint() else: res="??????????? ??? ?????? "+str(_type) # if issubclass(_type,OctetString): # print("v=",v) # # print("res=",res) # print("pp=",v.prettyPrint()) # print("bytes=",v.asOctets()) # print("bytes-decode=",v.asOctets().decode('iso-8859-1')) # print("str=",str(v)) # print("hex=",binascii.hexlify(v.asOctets()).decode("utf8")) return res
def load_pkcs1_openssl_der(cls, keyfile): """Loads a PKCS#1 DER-encoded public key file from OpenSSL. :param keyfile: contents of a DER-encoded file that contains the public key, from OpenSSL. :return: a PublicKey object """ from rsa.asn1 import OpenSSLPubKey from pyasn1.codec.der import decoder from pyasn1.type import univ (keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey()) if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'): raise TypeError("This is not a DER-encoded OpenSSL-compatible public key") return cls._load_pkcs1_der(keyinfo['key'][1:])
def generateNegotiateSecurityBlob(ntlm_data): mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)) mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)) mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10')) n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)) n.setComponentByName('mechTypes', mech_types) n.setComponentByName('mechToken', mech_token) nt = NegotiationToken() nt.setComponentByName('negTokenInit', n) ct = ContextToken() ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2')) ct.setComponentByName('innerContextToken', nt) return encoder.encode(ct)
def ssh_gss_oids(self, mode="client"): """ This method returns a single OID, because we only support the Kerberos V5 mechanism. :param str mode: Client for client mode and server for server mode :return: A byte sequence containing the number of supported OIDs, the length of the OID and the actual OID encoded with DER :rtype: Bytes :note: In server mode we just return the OID length and the DER encoded OID. """ OIDs = self._make_uint32(1) krb5_OID = encoder.encode(ObjectIdentifier(self._krb5_mech)) OID_len = self._make_uint32(len(krb5_OID)) if mode == "server": return OID_len + krb5_OID return OIDs + OID_len + krb5_OID
def oidToMibName(mibView, oid): if not isinstance(oid, tuple): oid = tuple(univ.ObjectIdentifier(oid)) _oid, label, suffix = mibView.getNodeNameByOid(oid) modName, symName, __suffix = mibView.getNodeLocation(_oid) mibNode, = mibView.mibBuilder.importSymbols( modName, symName ) if hasattr(mibNode, 'createTest'): # table column __modName, __symName, __s = mibView.getNodeLocation(_oid[:-1]) rowNode, = mibView.mibBuilder.importSymbols(__modName, __symName) return (symName, modName), rowNode.getIndicesFromInstId(suffix) elif not suffix: # scalar return (symName, modName), suffix elif suffix == (0,): # scalar return (symName, modName), __scalarSuffix else: raise NoSuchObjectError( str='No MIB registered that defines %s object, closest known parent is %s (%s::%s)' % ( univ.ObjectIdentifier(oid), univ.ObjectIdentifier(mibNode.name), modName, symName) ) # Value
def oidToMibName(mibView, oid): if not isinstance(oid, tuple): oid = tuple(univ.ObjectIdentifier(oid)) _oid, label, suffix = mibView.getNodeNameByOid(oid) modName, symName, __suffix = mibView.getNodeLocation(_oid) mibNode, = mibView.mibBuilder.importSymbols( modName, symName ) if hasattr(mibNode, 'createTest'): # table column __modName, __symName, __s = mibView.getNodeLocation(_oid[:-1]) rowNode, = mibView.mibBuilder.importSymbols(__modName, __symName) return (symName, modName), rowNode.getIndicesFromInstId(suffix) elif not suffix: # scalar return (symName, modName), suffix elif suffix == (0,): # scalar return (symName, modName), __scalarSuffix else: raise NoSuchObjectError( str='No MIB registered that defines %s object, closest known parent is %s (%s::%s)' % (univ.ObjectIdentifier(oid), univ.ObjectIdentifier(mibNode.name), modName, symName) ) # Value
def ssh_gss_oids(self, mode="client"): """ This method returns a single OID, because we only support the Kerberos V5 mechanism. :param str mode: Client for client mode and server for server mode :return: A byte sequence containing the number of supported OIDs, the length of the OID and the actual OID encoded with DER :note: In server mode we just return the OID length and the DER encoded OID. """ OIDs = self._make_uint32(1) krb5_OID = encoder.encode(ObjectIdentifier(self._krb5_mech)) OID_len = self._make_uint32(len(krb5_OID)) if mode == "server": return OID_len + krb5_OID return OIDs + OID_len + krb5_OID
def is_letsencrypt_cert(ee_cert): '''Return True if ee_cert was issued by Let's Encrypt. Args: ee_cert (EndEntityCert) ''' organization_name_oid = ObjectIdentifier(value='2.5.4.10') issuer = ee_cert.tbscert.pyasn1['issuer'] if issuer: for rdn in issuer['rdnSequence']: for item in rdn: if item.getComponentByName('type') == organization_name_oid: organisation_name = str(item.getComponentByName('value')) organisation_name = string_without_prefix('\x13\r', organisation_name) if organisation_name == "Let's Encrypt": return True return False
def tbscert_without_ct_extensions(tbscert): '''Return pyasn1_modules.rfc5280.TBSCertificate instance `cert_pyasn1` without sctlist extension (OID 1.3.6.1.4.1.11129.2.4.3) and poison extension (OID 1.3.6.1.4.1.11129.2.4.2), if any. ''' sctlist_oid = ObjectIdentifier(value='1.3.6.1.4.1.11129.2.4.2') poison_oid = ObjectIdentifier(value='1.3.6.1.4.1.11129.2.4.3') ct_oids = [sctlist_oid, poison_oid] extensions = tbscert['extensions'] without_ct_extensions = extensions.subtype() for extension in extensions: if extension['extnID'] not in ct_oids: without_ct_extensions.append(extension) copy = copy_pyasn1_instance(tbscert) copy['extensions'] = without_ct_extensions return copy
def load_pkcs1_openssl_der(cls, keyfile): """Loads a PKCS#1 DER-encoded public key file from OpenSSL. :param keyfile: contents of a DER-encoded file that contains the public key, from OpenSSL. :return: a PublicKey object :rtype: bytes """ from rsa.asn1 import OpenSSLPubKey from pyasn1.codec.der import decoder from pyasn1.type import univ (keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey()) if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'): raise TypeError("This is not a DER-encoded OpenSSL-compatible public key") return cls._load_pkcs1_der(keyinfo['key'][1:])
def set(self, *oidvalues): assert self.alive is True oidvalues_trans = [] for oid, value in oidvalues: if isinstance(oid, tuple): has_str = False for entry in oid: if isinstance(entry, str): has_str = True break if has_str: # if oid is a tuple containing strings, assume translation using cmdgen.MibVariable. # value must then be a Python type assert isinstance(value, int) or isinstance(value, str) or isinstance(value, bool) oidvalues_trans.append((cmdgen.MibVariable(*oid), value)) else: # value must be a rfc1902/pyasn1 type if not oid[-1] == 0: assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier) oidvalues_trans.append((oid, value)) elif isinstance(oid, str): # if oid is a string, assume nodeid lookup # value must then be a rfc1902/pyasn1 type, if oid is not a scalar if not oid.endswith(".0"): assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier) oidvalues_trans.append((nodeid(oid), value)) (error_indication, error_status, error_index, varbinds) = \ cmdgen.CommandGenerator().setCmd(self.auth, cmdgen.UdpTransportTarget((self.host, self.port), timeout=self.timeout, retries=self.retries), *oidvalues_trans) # pylint: disable=W0612 if error_indication or error_status: self.__set_error(error_indication, error_status, error_index, varbinds) raise SnmpError("SNMP set command on %s of oid values %r failed" % (self.host, oidvalues_trans), error_indication, error_status, error_index, varbinds) return SnmpVarBinds(varbinds)
def __get_json(self, keytype=str): json = {} for key, value in list(self.get_dict().items()): if isinstance(value, univ.OctetString): value = str(value) elif isinstance(value, univ.Integer): value = int(value) # pylint:disable=R0204 elif isinstance(value, univ.ObjectIdentifier): value = str(value) else: raise AssertionError("Unknown type %s encountered for oid %s" % (value.__class__.__name__, key)) json[keytype(key)] = value return json
def _buildOid(*components): output = [] for x in tuple(components): if isinstance(x, univ.ObjectIdentifier): output.extend(list(x)) else: output.append(int(x)) return univ.ObjectIdentifier(output)
def _OID(*components): output = [] for x in tuple(components): if isinstance(x, univ.ObjectIdentifier): output.extend(list(x)) else: output.append(int(x)) return univ.ObjectIdentifier(output)
def OidFromAttid(prefixTable, attr): # separate the ATTRTYP into two parts upperWord = attr / 65536 lowerWord = attr % 65536 # search in the prefix table to find the upperWord, if found, # construct the binary OID by appending lowerWord to the end of # found prefix. binaryOID = None for j, item in enumerate(prefixTable): if item['ndx'] == upperWord: binaryOID = item['prefix']['elements'][:item['prefix']['length']] if lowerWord < 128: binaryOID.append(chr(lowerWord)) else: if lowerWord >= 32768: lowerWord -= 32768 binaryOID.append(chr(((lowerWord/128) % 128)+128)) binaryOID.append(chr(lowerWord%128)) break if binaryOID is None: return None return str(decoder.decode('\x06' + chr(len(binaryOID)) + ''.join(binaryOID), asn1Spec = univ.ObjectIdentifier())[0])
def test_read_system_oid(self): self.assertEqual( univ.ObjectIdentifier(baytech_mrp27.sBTA), self.pdu.oid_mapping[sysObjectID].value )