我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sha.new()。
def request(self, method, request_uri, headers, content): """Modify the request headers""" keys = _get_end2end_headers(headers) keylist = "".join(["%s " % k for k in keys]) headers_val = "".join([headers[k] for k in keys]) created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime()) cnonce = _cnonce() request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val) request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower() headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % ( self.credentials[0], self.challenge['realm'], self.challenge['snonce'], cnonce, request_uri, created, request_digest, keylist)
def is_revoked(self, crl_list): """ Given a list of trusted CRL (their signature has already been verified with trusted anchors), this function returns True if the certificate is marked as revoked by one of those CRL. Note that if the Certificate was on hold in a previous CRL and is now valid again in a new CRL and bot are in the list, it will be considered revoked: this is because _all_ CRLs are checked (not only the freshest) and revocation status is not handled. Also note that the check on the issuer is performed on the Authority Key Identifier if available in _both_ the CRL and the Cert. Otherwise, the issuers are simply compared. """ for c in crl_list: if (self.authorityKeyID is not None and c.authorityKeyID is not None and self.authorityKeyID == c.authorityKeyID): return self.serial in map(lambda x: x[0], c.revoked_cert_serials) elif (self.issuer == c.issuer): return self.serial in map(lambda x: x[0], c.revoked_cert_serials) return False
def __init__(self, seed=None): self.pool_index = 0 self.digest = None self.next_byte = 0 self.lock = _threading.Lock() try: import hashlib self.hash = hashlib.sha1() self.hash_len = 20 except: try: import sha self.hash = sha.new() self.hash_len = 20 except: import md5 self.hash = md5.new() self.hash_len = 16 self.pool = bytearray(b'\0' * self.hash_len) if seed is not None: self.stir(bytearray(seed)) self.seeded = True else: self.seeded = False
def moveTo(self, destination): try: os.rename(self.path, destination.path) self.restat(False) except OSError, ose: if ose.errno == errno.EXDEV: # man 2 rename, ubuntu linux 5.10 "breezy": # oldpath and newpath are not on the same mounted filesystem. # (Linux permits a filesystem to be mounted at multiple # points, but rename(2) does not work across different mount # points, even if the same filesystem is mounted on both.) # that means it's time to copy trees of directories! secsib = destination.temporarySibling() self.copyTo(secsib) # slow secsib.moveTo(destination) # visible # done creating new stuff. let's clean me up. mysecsib = self.temporarySibling() self.moveTo(mysecsib) # visible mysecsib.remove() # slow else: raise
def testAuth(self): self.authComplete = False outlist = [] ca = ConnectComponentAuthenticator("cjid", "secret") xs = xmlstream.XmlStream(ca) xs.transport = DummyTransport(outlist) xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, self.authPassed) # Go... xs.connectionMade() xs.dataReceived("<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' from='cjid' id='12345'>") # Calculate what we expect the handshake value to be hv = sha.new("%s%s" % ("12345", "secret")).hexdigest() self.assertEquals(outlist[1], "<handshake>%s</handshake>" % (hv)) xs.dataReceived("<handshake/>") self.assertEquals(self.authComplete, True)
def _continueGEX_GROUP(self, ignored, pubKey, f, signature): serverKey = keys.getPublicKeyObject(pubKey) sharedSecret = _MPpow(f, self.x, DH_PRIME) h = sha.new() h.update(NS(self.ourVersionString)) h.update(NS(self.otherVersionString)) h.update(NS(self.ourKexInitPayload)) h.update(NS(self.serverKexInitPayload)) h.update(NS(pubKey)) h.update(MP(self.DHpubKey)) h.update(MP(f)) h.update(sharedSecret) exchangeHash = h.digest() if not keys.verifySignature(serverKey, signature, exchangeHash): self.sendDisconnect(DISCONNECT_KEY_EXCHANGE_FAILED, 'bad signature') return self._keySetup(sharedSecret, exchangeHash)
def _continueGEX_REPLY(self, ignored, pubKey, f, signature): serverKey = keys.getPublicKeyObject(pubKey) sharedSecret = _MPpow(f, self.x, self.p) h = sha.new() h.update(NS(self.ourVersionString)) h.update(NS(self.otherVersionString)) h.update(NS(self.ourKexInitPayload)) h.update(NS(self.serverKexInitPayload)) h.update(NS(pubKey)) h.update('\x00\x00\x08\x00') h.update(MP(self.p)) h.update(MP(self.g)) h.update(MP(self.DHpubKey)) h.update(MP(f)) h.update(sharedSecret) exchangeHash = h.digest() if not keys.verifySignature(serverKey, signature, exchangeHash): self.sendDisconnect(DISCONNECT_KEY_EXCHANGE_FAILED, 'bad signature') return self._keySetup(sharedSecret, exchangeHash)
def __init__(self, seed=None): self.pool_index = 0 self.digest = None self.next_byte = 0 self.lock = _threading.Lock() try: import hashlib self.hash = hashlib.sha1() self.hash_len = 20 except: try: import sha self.hash = sha.new() self.hash_len = 20 except: import md5 self.hash = md5.new() self.hash_len = 16 self.pool = '\0' * self.hash_len if not seed is None: self.stir(seed) self.seeded = True else: self.seeded = False
def request(self, method, request_uri, headers, content): """Modify the request headers""" keys = _get_end2end_headers(headers) keylist = "".join(["%s " % k for k in keys]) headers_val = "".join([headers[k] for k in keys]) created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime()) cnonce = _cnonce() request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val) request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower() headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % ( self.credentials[0], self.challenge['realm'], self.challenge['snonce'], cnonce, request_uri, created, request_digest, keylist, )
def _send_command(self, command, sql): #send_data = struct.pack('<i', len(sql) + 1) + command + sql # could probably be more efficient, at least it's correct if not self.socket: self.errorhandler(None, InterfaceError, "(0, '')") # If the last query was unbuffered, make sure it finishes before # sending new commands if self._result is not None and self._result.unbuffered_active: self._result._finish_unbuffered_query() if isinstance(sql, unicode): sql = sql.encode(self.charset) prelude = struct.pack('<i', len(sql)+1) + int2byte(command) self.wfile.write(prelude + sql) self.wfile.flush() if DEBUG: dump_packet(prelude + sql)
def _SetDefaultsFromSchema(self): """Assign object default values according to the schema. This will not overwrite properties that have already been set.""" defaults = {} for property, attributes in self._schema.iteritems(): (is_list, property_type, is_strong, is_required) = attributes[0:4] if is_required and len(attributes) >= 5 and \ not property in self._properties: default = attributes[4] defaults[property] = default if len(defaults) > 0: # Use do_copy=True so that each new object gets its own copy of strong # objects, lists, and dicts. self.UpdateProperties(defaults, do_copy=True)
def AddFile(self, path, settings=None): (file_group, hierarchical) = self.FileGroup(path) file_ref = file_group.AddOrGetFileByPath(path, hierarchical) if file_ref in self._files_by_xcfilelikeelement and \ isinstance(file_ref, PBXVariantGroup): # There's already a PBXBuildFile in this phase corresponding to the # PBXVariantGroup. path just provides a new variant that belongs to # the group. Add the path to the dict. pbxbuildfile = self._files_by_xcfilelikeelement[file_ref] self._AddBuildFileToDicts(pbxbuildfile, path) else: # Add a new PBXBuildFile to get file_ref into the phase. if settings is None: pbxbuildfile = PBXBuildFile({'fileRef': file_ref}) else: pbxbuildfile = PBXBuildFile({'fileRef': file_ref, 'settings': settings}) self.AppendBuildFile(pbxbuildfile, path)
def check(self, data, digest): # Check digest matches the expected value obj = sha.new(data) computed = obj.hexdigest() self.assertTrue(computed == digest) # Verify that the value doesn't change between two consecutive # digest operations. computed_again = obj.hexdigest() self.assertTrue(computed == computed_again) # Check hexdigest() output matches digest()'s output digest = obj.digest() hexd = "" for c in digest: hexd += '%02x' % ord(c) self.assertTrue(computed == hexd)
def create_profile(self, user): """ Create a ``RegistrationProfile`` for a given ``User``, and return the ``RegistrationProfile``. The activation key for the ``RegistrationProfile`` will be a SHA1 hash, generated from a combination of the ``User``'s username and a random salt. """ salt = sha.new(str(random.random())).hexdigest()[:5] activation_key = sha.new(salt+user.username).hexdigest() # prepend "key_" to the key_name, because key_names can't start with numbers registrationprofile = RegistrationProfile(user=user, activation_key=activation_key, key_name="key_"+activation_key) registrationprofile.put() return registrationprofile
def encryptData(self, encryptKey, privParameters, dataToEncrypt): if AES is None: raise error.StatusInformation( errorIndication=errind.encryptionError ) snmpEngineBoots, snmpEngineTime, salt = privParameters # 3.3.1.1 aesKey, iv, salt = self.__getEncryptionKey( encryptKey, snmpEngineBoots, snmpEngineTime ) # 3.3.1.3 aesObj = AES.new(aesKey, AES.MODE_CFB, iv, segment_size=128) # PyCrypto seems to require padding dataToEncrypt = dataToEncrypt + univ.OctetString((0,) * (16 - len(dataToEncrypt) % 16)).asOctets() ciphertext = aesObj.encrypt(dataToEncrypt) # 3.3.1.4 return univ.OctetString(ciphertext), univ.OctetString(salt) # 3.2.4.2
def encryptData(self, encryptKey, privParameters, dataToEncrypt): if DES is None: raise error.StatusInformation( errorIndication=errind.encryptionError ) snmpEngineBoots, snmpEngineTime, salt = privParameters # 8.3.1.1 desKey, salt, iv = self.__getEncryptionKey( encryptKey, snmpEngineBoots ) # 8.3.1.2 privParameters = univ.OctetString(salt) # 8.1.1.2 desObj = DES.new(desKey, DES.MODE_CBC, iv) plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets() ciphertext = desObj.encrypt(plaintext) # 8.3.1.3 & 4 return univ.OctetString(ciphertext), privParameters # 8.2.4.2
def encryptData(self, encryptKey, privParameters, dataToEncrypt): if DES3 is None: raise error.StatusInformation( errorIndication=errind.encryptionError ) snmpEngineBoots, snmpEngineTime, salt = privParameters des3Key, salt, iv = self.__getEncryptionKey( encryptKey, snmpEngineBoots ) des3Obj = DES3.new(des3Key, DES3.MODE_CBC, iv) privParameters = univ.OctetString(salt) plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets() ciphertext = des3Obj.encrypt(plaintext) return univ.OctetString(ciphertext), privParameters # 5.1.1.3
def _build_token(self): nonce = str(uuid.uuid4()) base64nonce = binascii.b2a_base64(binascii.a2b_qp(nonce)) created_date = datetime.utcnow().isoformat() + 'Z' sha_object = sha.new(nonce + created_date + self.secret) password_64 = binascii.b2a_base64(sha_object.digest()) properties = { "Username": self.username, "PasswordDigest": password_64.strip(), "Nonce": base64nonce.strip(), "Created": created_date, } header = 'UsernameToken ' + self._serialize_header(properties) return {'X-WSSE': header}
def response(self, response, content): """Gives us a chance to update with new nonces or such returned from the last authorized response. Over-rise this in sub-classes if necessary. Return TRUE is the request is to be retried, for example Digest may return stale=true. """ return False
def __init__(self, credentials, host, request_uri, headers, response, content, http): Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http) challenge = _parse_www_authenticate(response, 'www-authenticate') self.challenge = challenge['hmacdigest'] # TODO: self.challenge['domain'] self.challenge['reason'] = self.challenge.get('reason', 'unauthorized') if self.challenge['reason'] not in ['unauthorized', 'integrity']: self.challenge['reason'] = 'unauthorized' self.challenge['salt'] = self.challenge.get('salt', '') if not self.challenge.get('snonce'): raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty.")) self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1') if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']: raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm'])) self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1') if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']: raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm'])) if self.challenge['algorithm'] == 'HMAC-MD5': self.hashmod = _md5 else: self.hashmod = _sha if self.challenge['pw-algorithm'] == 'MD5': self.pwhashmod = _md5 else: self.pwhashmod = _sha self.key = "".join([self.credentials[0], ":", self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(), ":", self.challenge['realm']]) self.key = self.pwhashmod.new(self.key).hexdigest().lower()
def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior self.cache = cache self.safe = safe if not os.path.exists(cache): os.makedirs(self.cache)