我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyasn1.type.univ.OctetString()。
def prettyIn(self, value): try: if isinstance(value, unicode): return value elif isinstance(value, str): return value.decode(self.encoding) elif isinstance(value, (tuple, list)): return self.prettyIn(''.join([chr(x) for x in value])) elif isinstance(value, univ.OctetString): return value.asOctets().decode(self.encoding) else: return unicode(value) except (UnicodeDecodeError, LookupError): raise error.PyAsn1Error( "Can't decode string '%s' with codec %s" % (value, self.encoding) )
def prettyIn(self, value): try: if isinstance(value, str): return value elif isinstance(value, bytes): return value.decode(self.encoding) elif isinstance(value, (tuple, list)): return self.prettyIn(bytes(value)) elif isinstance(value, univ.OctetString): return value.asOctets().decode(self.encoding) else: return str(value) except (UnicodeDecodeError, LookupError): raise error.PyAsn1Error( "Can't decode string '%s' with codec %s" % (value, self.encoding) )
def generateNegotiateSecurityBlob(ntlm_data): mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)) mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)) mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10')) n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)) n.setComponentByName('mechTypes', mech_types) n.setComponentByName('mechToken', mech_token) nt = NegotiationToken() nt.setComponentByName('negTokenInit', n) ct = ContextToken() ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2')) ct.setComponentByName('innerContextToken', nt) return encoder.encode(ct)
def __getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime): salt = [self._localInt >> 56 & 0xff, self._localInt >> 48 & 0xff, self._localInt >> 40 & 0xff, self._localInt >> 32 & 0xff, self._localInt >> 24 & 0xff, self._localInt >> 16 & 0xff, self._localInt >> 8 & 0xff, self._localInt & 0xff] if self._localInt == 0xffffffffffffffff: self._localInt = 0 else: self._localInt += 1 return self.__getDecryptionKey(privKey, snmpEngineBoots, snmpEngineTime, salt) + ( univ.OctetString(salt).asOctets(),)
def __getDecryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime, salt): snmpEngineBoots, snmpEngineTime, salt = ( int(snmpEngineBoots), int(snmpEngineTime), salt ) iv = [snmpEngineBoots >> 24 & 0xff, snmpEngineBoots >> 16 & 0xff, snmpEngineBoots >> 8 & 0xff, snmpEngineBoots & 0xff, snmpEngineTime >> 24 & 0xff, snmpEngineTime >> 16 & 0xff, snmpEngineTime >> 8 & 0xff, snmpEngineTime & 0xff] + salt return privKey[:self.keySize].asOctets(), univ.OctetString(iv).asOctets()
def encryptData(self, encryptKey, privParameters, dataToEncrypt): if DES is None: raise error.StatusInformation( errorIndication=errind.encryptionError ) snmpEngineBoots, snmpEngineTime, salt = privParameters # 8.3.1.1 desKey, salt, iv = self.__getEncryptionKey( encryptKey, snmpEngineBoots ) # 8.3.1.2 privParameters = univ.OctetString(salt) # 8.1.1.2 desObj = DES.new(desKey, DES.MODE_CBC, iv) plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets() ciphertext = desObj.encrypt(plaintext) # 8.3.1.3 & 4 return univ.OctetString(ciphertext), privParameters # 8.2.4.2
def hashPassphrase(passphrase, hashFunc): passphrase = univ.OctetString(passphrase).asOctets() # noinspection PyDeprecation,PyCallingNonCallable hasher = hashFunc() ringBuffer = passphrase * (64 // len(passphrase) + 1) # noinspection PyTypeChecker ringBufferLen = len(ringBuffer) count = 0 mark = 0 while count < 16384: e = mark + 64 if e < ringBufferLen: hasher.update(ringBuffer[mark:e]) mark = e else: hasher.update( ringBuffer[mark:ringBufferLen] + ringBuffer[0:e - ringBufferLen] ) mark = e - ringBufferLen count += 1 return hasher.digest()
def encryptData(self, encryptKey, privParameters, dataToEncrypt): if DES3 is None: raise error.StatusInformation( errorIndication=errind.encryptionError ) snmpEngineBoots, snmpEngineTime, salt = privParameters des3Key, salt, iv = self.__getEncryptionKey( encryptKey, snmpEngineBoots ) des3Obj = DES3.new(des3Key, DES3.MODE_CBC, iv) privParameters = univ.OctetString(salt) plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets() ciphertext = des3Obj.encrypt(plaintext) return univ.OctetString(ciphertext), privParameters # 5.1.1.3
def __getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime): salt = [self._localInt>>56&0xff, self._localInt>>48&0xff, self._localInt>>40&0xff, self._localInt>>32&0xff, self._localInt>>24&0xff, self._localInt>>16&0xff, self._localInt>>8&0xff, self._localInt&0xff] if self._localInt == 0xffffffffffffffff: self._localInt = 0 else: self._localInt += 1 return self.__getDecryptionKey(privKey, snmpEngineBoots, snmpEngineTime, salt) + (univ.OctetString(salt).asOctets(),)
def __getDecryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime, salt): snmpEngineBoots, snmpEngineTime, salt = ( int(snmpEngineBoots), int(snmpEngineTime), salt ) iv = [snmpEngineBoots>>24&0xff, snmpEngineBoots>>16&0xff, snmpEngineBoots>>8&0xff, snmpEngineBoots&0xff, snmpEngineTime>>24&0xff, snmpEngineTime>>16&0xff, snmpEngineTime>>8&0xff, snmpEngineTime&0xff] + salt return privKey[:self.keySize].asOctets(), univ.OctetString(iv).asOctets()
def hashPassphraseMD5(passphrase): passphrase = univ.OctetString(passphrase).asOctets() md = md5() ringBuffer = passphrase * (passphrase and (64//len(passphrase)+1) or 1) ringBufferLen = len(ringBuffer) count = 0 mark = 0 while count < 16384: e = mark + 64 if e < ringBufferLen: md.update(ringBuffer[mark:e]) mark = e else: md.update( ringBuffer[mark:ringBufferLen] + ringBuffer[0:e-ringBufferLen] ) mark = e-ringBufferLen count += 1 return md.digest()
def hashPassphraseSHA(passphrase): passphrase = univ.OctetString(passphrase).asOctets() md = sha1() ringBuffer = passphrase * (64//len(passphrase)+1) ringBufferLen = len(ringBuffer) count = 0 mark = 0 while count < 16384: e = mark + 64 if e < ringBufferLen: md.update(ringBuffer[mark:e]) mark = e else: md.update( ringBuffer[mark:ringBufferLen] + ringBuffer[0:e-ringBufferLen] ) mark = e-ringBufferLen count += 1 return md.digest()
def test_aes_encryption_consistency(self): # test encryption-decryption for each test message for msg in self.TEST_MSGS: # precreate simple ASN.1 structure to encode and encrypt test_der = encode(OctetString(msg)) # generate random IV and a key iv = urandom(16) key = urandom(16) # encrypt and decrypt message ct = self.crypto_obj._Crypto__encrypt_with_aes(test_der, key, iv) pt = self.crypto_obj._Crypto__decrypt_with_aes(ct, key, iv) # check whether they are equal self.assertEqual(test_der, pt)
def probeContext(transportDomain, transportAddress, contextName): candidate = [ contextName, '.'.join([str(x) for x in transportDomain]) ] if transportDomain[:len(udp.domainName)] == udp.domainName: candidate.append(transportAddress[0]) elif udp6 and transportDomain[:len(udp6.domainName)] == udp6.domainName: candidate.append( str(transportAddress[0]).replace(':', '_') ) elif unix and transportDomain[:len(unix.domainName)] == unix.domainName: candidate.append(transportAddress) candidate = [str(x) for x in candidate if x] while candidate: yield rfc1902.OctetString(os.path.normpath(os.path.sep.join(candidate)).replace(os.path.sep, '/')).asOctets() del candidate[-1] # main script body starts here
def set(self, *oidvalues): assert self.alive is True oidvalues_trans = [] for oid, value in oidvalues: if isinstance(oid, tuple): has_str = False for entry in oid: if isinstance(entry, str): has_str = True break if has_str: # if oid is a tuple containing strings, assume translation using cmdgen.MibVariable. # value must then be a Python type assert isinstance(value, int) or isinstance(value, str) or isinstance(value, bool) oidvalues_trans.append((cmdgen.MibVariable(*oid), value)) else: # value must be a rfc1902/pyasn1 type if not oid[-1] == 0: assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier) oidvalues_trans.append((oid, value)) elif isinstance(oid, str): # if oid is a string, assume nodeid lookup # value must then be a rfc1902/pyasn1 type, if oid is not a scalar if not oid.endswith(".0"): assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier) oidvalues_trans.append((nodeid(oid), value)) (error_indication, error_status, error_index, varbinds) = \ cmdgen.CommandGenerator().setCmd(self.auth, cmdgen.UdpTransportTarget((self.host, self.port), timeout=self.timeout, retries=self.retries), *oidvalues_trans) # pylint: disable=W0612 if error_indication or error_status: self.__set_error(error_indication, error_status, error_index, varbinds) raise SnmpError("SNMP set command on %s of oid values %r failed" % (self.host, oidvalues_trans), error_indication, error_status, error_index, varbinds) return SnmpVarBinds(varbinds)
def __get_json(self, keytype=str): json = {} for key, value in list(self.get_dict().items()): if isinstance(value, univ.OctetString): value = str(value) elif isinstance(value, univ.Integer): value = int(value) # pylint:disable=R0204 elif isinstance(value, univ.ObjectIdentifier): value = str(value) else: raise AssertionError("Unknown type %s encountered for oid %s" % (value.__class__.__name__, key)) json[keytype(key)] = value return json
def clone(self, value=noValue, **kwargs): """Creates a copy of a |ASN.1| type or object. Any parameters to the *clone()* method will replace corresponding properties of the |ASN.1| object. Parameters ---------- value: :class:`unicode`, :class:`str`, :class:`bytes` or |ASN.1| object unicode object (Python 2) or string (Python 3), alternatively string (Python 2) or bytes (Python 3) representing octet-stream of serialized unicode string (note `encoding` parameter) or |ASN.1| class instance. tagSet: :py:class:`~pyasn1.type.tag.TagSet` Object representing non-default ASN.1 tag(s) subtypeSpec: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection` Object representing non-default ASN.1 subtype constraint(s) encoding: :py:class:`str` Unicode codec ID to encode/decode :py:class:`unicode` (Python 2) or :py:class:`str` (Python 3) the payload when |ASN.1| object is used in octet-stream context. Returns ------- : new instance of |ASN.1| type/value """ return univ.OctetString.clone(self, value, **kwargs)
def subtype(self, value=noValue, **kwargs): """Creates a copy of a |ASN.1| type or object. Any parameters to the *subtype()* method will be added to the corresponding properties of the |ASN.1| object. Parameters ---------- value: :class:`unicode`, :class:`str`, :class:`bytes` or |ASN.1| object unicode object (Python 2) or string (Python 3), alternatively string (Python 2) or bytes (Python 3) representing octet-stream of serialized unicode string (note `encoding` parameter) or |ASN.1| class instance. implicitTag: :py:class:`~pyasn1.type.tag.Tag` Implicitly apply given ASN.1 tag object to caller's :py:class:`~pyasn1.type.tag.TagSet`, then use the result as new object's ASN.1 tag(s). explicitTag: :py:class:`~pyasn1.type.tag.Tag` Explicitly apply given ASN.1 tag object to caller's :py:class:`~pyasn1.type.tag.TagSet`, then use the result as new object's ASN.1 tag(s). subtypeSpec: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection` Object representing non-default ASN.1 subtype constraint(s) encoding: :py:class:`str` Unicode codec ID to encode/decode :py:class:`unicode` (Python 2) or :py:class:`str` (Python 3) the payload when |ASN.1| object is used in octet-stream context. Returns ------- : new instance of |ASN.1| type/value """ return univ.OctetString.subtype(self, value, **kwargs)
def value(self): return univ.OctetString('Outlet #{}'.format(self.outlet_number))
def test_read_system_description(self): self.assertEqual( univ.OctetString("Universal RPC Host Module (virtualpdu)"), self.pdu.oid_mapping[sysDescr].value )
def test_read_outlet_name(self): outlet_name = self.pdu.oid_mapping[self.outlet_name_oid] self.assertEqual( univ.OctetString('Outlet #1'), outlet_name.value )
def test_read_system_description(self): self.assertEqual( univ.OctetString("APC Rack PDU (virtualpdu)"), self.pdu.oid_mapping[sysDescr].value )
def clone(self, value=noValue, tagSet=None, subtypeSpec=None, encoding=None, binValue=noValue, hexValue=noValue): """Creates a copy of a |ASN.1| type or object. Any parameters to the *clone()* method will replace corresponding properties of the |ASN.1| object. Parameters ---------- value: :class:`unicode`, :class:`str`, :class:`bytes` or |ASN.1| object unicode object (Python 2) or string (Python 3), alternatively string (Python 2) or bytes (Python 3) representing octet-stream of serialized unicode string (note `encoding` parameter) or |ASN.1| class instance. tagSet: :py:class:`~pyasn1.type.tag.TagSet` Object representing non-default ASN.1 tag(s) subtypeSpec: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection` Object representing non-default ASN.1 subtype constraint(s) encoding: :py:class:`str` Unicode codec ID to encode/decode :py:class:`unicode` (Python 2) or :py:class:`str` (Python 3) the payload when |ASN.1| object is used in octet-stream context. Returns ------- : new instance of |ASN.1| type/value """ return univ.OctetString.clone(self, value, tagSet, subtypeSpec, encoding, binValue, hexValue)
def subtype(self, value=noValue, implicitTag=None, explicitTag=None, subtypeSpec=None, encoding=None, binValue=noValue, hexValue=noValue): """Creates a copy of a |ASN.1| type or object. Any parameters to the *subtype()* method will be added to the corresponding properties of the |ASN.1| object. Parameters ---------- value: :class:`unicode`, :class:`str`, :class:`bytes` or |ASN.1| object unicode object (Python 2) or string (Python 3), alternatively string (Python 2) or bytes (Python 3) representing octet-stream of serialized unicode string (note `encoding` parameter) or |ASN.1| class instance. implicitTag: :py:class:`~pyasn1.type.tag.Tag` Implicitly apply given ASN.1 tag object to caller's :py:class:`~pyasn1.type.tag.TagSet`, then use the result as new object's ASN.1 tag(s). explicitTag: :py:class:`~pyasn1.type.tag.Tag` Explicitly apply given ASN.1 tag object to caller's :py:class:`~pyasn1.type.tag.TagSet`, then use the result as new object's ASN.1 tag(s). subtypeSpec: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection` Object representing non-default ASN.1 subtype constraint(s) encoding: :py:class:`str` Unicode codec ID to encode/decode :py:class:`unicode` (Python 2) or :py:class:`str` (Python 3) the payload when |ASN.1| object is used in octet-stream context. Returns ------- : new instance of |ASN.1| type/value """ return univ.OctetString.subtype(self, value, implicitTag, explicitTag, subtypeSpec, encoding, binValue, hexValue)
def generateAuthSecurityBlob(ntlm_data): response_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)) n = NegTokenTarg().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)) n.setComponentByName('responseToken', response_token) nt = NegotiationToken() nt.setComponentByName('negTokenTarg', n) return encoder.encode(nt)
def registerContextName(self, contextName, mibInstrum=None): contextName = univ.OctetString(contextName).asOctets() if contextName in self.contextNames: raise error.PySnmpError( 'Duplicate contextName %s' % contextName ) debug.logger & debug.flagIns and debug.logger( 'registerContextName: registered contextName %r, mibInstrum %r' % (contextName, mibInstrum)) if mibInstrum is None: self.contextNames[contextName] = self.contextNames[null] else: self.contextNames[contextName] = mibInstrum
def unregisterContextName(self, contextName): contextName = univ.OctetString(contextName).asOctets() if contextName in self.contextNames: debug.logger & debug.flagIns and debug.logger( 'unregisterContextName: unregistered contextName %r' % contextName) del self.contextNames[contextName]
def getMibInstrum(self, contextName=null): contextName = univ.OctetString(contextName).asOctets() if contextName not in self.contextNames: debug.logger & debug.flagIns and debug.logger('getMibInstrum: contextName %r not registered' % contextName) raise error.PySnmpError( 'Missing contextName %s' % contextName ) else: debug.logger & debug.flagIns and debug.logger( 'getMibInstrum: contextName %r, mibInstum %r' % (contextName, self.contextNames[contextName])) return self.contextNames[contextName]
def prettyIn(self, value): if isinstance(value, str) and len(value) != 4: try: value = [int(x) for x in value.split('.')] except: raise error.ProtocolError('Bad IP address syntax %s' % value) if len(value) != 4: raise error.ProtocolError('Bad IP address syntax') return univ.OctetString.prettyIn(self, value)
def clone(self, *args, **kwargs): return univ.OctetString.clone(self, *args, **kwargs).setFixedLength(self.getFixedLength())
def prettyIn(self, value): if isinstance(value, str) and len(value) != 4: try: value = [int(x) for x in value.split('.')] except: raise error.ProtocolError('Bad IP address syntax %s' % value) value = OctetString.prettyIn(self, value) if len(value) != 4: raise error.ProtocolError('Bad IP address syntax') return value
def __init__(self, value=univ.noValue, **kwargs): if 'namedValues' not in kwargs: kwargs['namedValues'] = self.namedValues OctetString.__init__(self, value, **kwargs)
def prettyIn(self, bits): if not isinstance(bits, (tuple, list)): return OctetString.prettyIn(self, bits) # raw bitstring octets = [] for bit in bits: # tuple of named bits v = self.namedValues.getValue(bit) if v is None: raise error.ProtocolError('Unknown named bit %s' % bit) d, m = divmod(v, 8) if d >= len(octets): octets.extend([0] * (d - len(octets) + 1)) octets[d] |= 0x01 << (7 - m) return OctetString.prettyIn(self, octets)