我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用hmac.compare_digest()。
def check_signature(header_signature, request_body): if not header_signature: raise ValueError('No X-Hub-Signature header.') algorithm, signature_digest = header_signature.split('=') if algorithm != 'sha1': raise ValueError('Unsupported digest algorithm {}.'.format(algorithm)) body_digest = hmac.new( webhook_secret(), msg=request_body, digestmod=hashlib.sha1).hexdigest() if not hmac.compare_digest(body_digest, signature_digest): raise ValueError('Body digest did not match signature digest') return True
def check_mdp(hashed_mdp, utilisateur_mdp, hashage): """Check le mot-de-passe haché. Ceci en récupérant le mot de passe haché et le salt du mdp haché passée en argument. L'objet hachage est construit en fonction du type. Args: Le mot-de-passe haché,le mot-de-passe normale ainsi que le type de hachage. Returns: Retourne true ou false en fonction du test d'égalité. """ mdp, salt = hashed_mdp.split(':') contenu = salt + utilisateur_mdp h = hashlib.new(hashage) h.update(contenu.encode('utf-8')) return compare_digest(mdp, h.hexdigest())
def _verify(callback, signature, app_secret_key): """ This function will verify the integrity and authenticity of the callback received :param callback: callback received from facebook :param signature: X-hub-signature of the request :param app_secret_key: facebook app secret key. You can find it on your app page :return: True if signature matches else returns false """ method, sign = signature.split("=") """ Now a key will be created of callback using app secret as the key. And compared with xsignature found in the the headers of the request. If both the keys match then the function will run further otherwise it will halt """ hmac_object = hmac.new(app_secret_key.encode("utf-8"), callback, "sha1") key = hmac_object.hexdigest() return hmac.compare_digest(sign, key)
def validate(self): self.user = None rv = BaseForm.validate(self) if not rv: return False def fail(): self.password.errors.append(ERROR_INVALID_USERNAME_PASSWORD) return False user = User.query.filter(User.name == self.username.data).first() if not user: compare_digest(dummy_password, hash_password(self.password.data, 'the cake is a lie!')) return fail() if not compare_digest(user.password, hash_password(self.password.data, user.salt)): return fail() if not user.active: self.username.errors.append(ERROR_ACCOUNT_DISABLED) return False self.user = user return True
def validate(self): rv = BaseForm.validate(self) if not rv: return False if current_user.name in self.password.data: self.password.errors.append(ERROR_PASSWORD_CONTAINS_USERNAME) return False if self.password.data != self.password_repeat.data: self.password_repeat.errors.append(ERROR_PASSWORD_REPEAT_MISMATCHES) return False if not compare_digest(current_user.password, hash_password(self.password_current.data, current_user.salt)): self.password_current.errors.append(ERROR_PASSWORD_INCORRECT) return False return True
def verify(cls, token, timestamp, signature): """ Verify signature of event for security Args: token (str): Randomly generated alpha numeric string with length 50. timestamp (int): Number of seconds passed since January 1, 1970 signature (str): String with hexadecimal digits generate by HMAC algorithm. Returns: boolean: True if signature is valid """ if timestamp is not None and signature is not None and token is not None: key_bytes = bytes(settings.MAILGUN_KEY, 'latin-1') data_bytes = bytes('{}{}'.format(timestamp, token), 'latin-1') hmac_digest = hmac.new( key=key_bytes, msg=data_bytes, digestmod=hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, hmac_digest) return False
def match_webhook_secret(request): """Match the webhook secret sent from GitHub""" if os.environ.get("OVER_HEROKU", False): if ('X-Hub-Signature' in request.headers and request.headers.get('X-Hub-Signature') is not None): header_signature = request.headers.get('X-Hub-Signature', None) else: abort(403) sha_name, signature = header_signature.split('=') if sha_name != 'sha1': abort(501) mac = hmac.new(os.environ["GITHUB_PAYLOAD_SECRET"].encode(), msg=request.data, digestmod="sha1") if not hmac.compare_digest(str(mac.hexdigest()), str(signature)): abort(403) return True
def is_a_valid_mailgun_post(request): """ Taken from http://mailgun-documentation.readthedocs.io/en/latest/ user_manual.html#webhooks :param request: Request object :return: True or False if the request was signed by mailgun """ token = request.POST['token'] timestamp = request.POST['timestamp'] signature = request.POST['signature'] key = getattr(settings, 'MAILGUN_PRIVATE_API_KEY', '').encode('utf-8') msg = ('{}{}'.format(timestamp, token)).encode('utf-8') hmac_digest = hmac.new(key=key, msg=msg, digestmod=hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, hmac_digest)
def check(self, typestring, querydata): if 'url' not in querydata: raise InvalidQueryDataException('URL missing') if 'digest' not in querydata: raise InvalidQueryDataException('digest missing') url_string = querydata['url'][0] supplied_digest = querydata['digest'][0] correct_digest = get_hmac_sha1_digest( singletons.settings.HMAC_SECRET, url_string, typestring, ) if not hmac.compare_digest(supplied_digest.lower(), correct_digest): raise InvalidDigestException( 'Incorrect digest supplied: ' + correct_digest)
def validate_signature(request_body, signature, secret): """ Validate LINE signature on HTTP requests Args: request_body: Body of HTTP request signature: Request header 'X-LINE-ChannelSignature' secret: Secret used to create digest from request_body Returns: True if valid, False otherwise """ if (request_body and signature and secret): secret = secret.encode("utf-8") sig = signature.encode("utf-8") generated_sig = base64.b64encode(hmac.new(secret, request_body, digestmod=hashlib.sha256).digest()) return hmac.compare_digest(generated_sig, sig) else: return False
def check_token(secret, key, token): try: data = json.loads(dec64(token)) _log.debug(data) if 'sig' in data and 'exp' in data: gend , exp = gen_signature(secret, key, exp = data['exp']) if hmac.compare_digest(gend, data['sig']): if is_expired(data['exp']): _log.debug('Token %s for %s is expired'%(token, h125(secret))) return False return True else: _log.debug('Token %s is not valid'%json.loads(dec64(token))) _log.debug('%s - %s'%(gend, data['sig'])) else: _log.debug('Not enough info in token') except: _log.debug('Failed to decode token %s'%token) pass return False
def check_token(secret, key, token, extra = None): try: data = json.loads(dec64(token)) _log.debug(data) if 'sig' in data and 'exp' in data: gend , exp = gen_signature(secret, key, extra, exp = data['exp']) if hmac.compare_digest(gend, data['sig']): if is_expired(data['exp']): _log.debug('Token %s for %s is expired'%(token, h512(secret))) return False return True else: _log.debug('Token %s is not valid'%json.loads(dec64(token))) _log.debug('%s - %s'%(gend, data['sig'])) else: _log.debug('Not enough info in token') except Exception as e: _log.debug('Failed to decode token %s'%token) _log.exception(e) return False
def validate(self, body, signature): """Check signature. https://devdocs.line.me/en/#webhook-authentication :param str body: Request body (as text) :param str signature: X-Line-Signature value (as text) :rtype: bool :return: result """ gen_signature = hmac.new( self.channel_secret, body.encode('utf-8'), hashlib.sha256 ).digest() return compare_digest( signature.encode('utf-8'), base64.b64encode(gen_signature) )
def constant_time_string_compare(a, b): """Helper for comparing string in constant time, independent of the python version being used. Args: a (str): A string to compare b (str): A string to compare """ try: return hmac.compare_digest(a, b) except AttributeError: if len(a) != len(b): return False result = 0 for x, y in zip(a, b): result |= ord(x) ^ ord(y) return result == 0
def verify_signature(second_ciphertext, alices_private_key, random_value1, decrypt, random_value_size=32): """ usage: verify_signature(response_ciphertext : bytearray, alices_private_key : bytearray, random_value1 : bytearray, decrypt : function, random_value_size=32) => shared secret Verifies a signature request. Returns a shared secret. """ plaintext = decrypt(second_ciphertext, alices_private_key) random_value2 = plaintext[-random_value_size:] _random_value1 = plaintext[-(2 * random_value_size):-random_value_size] bobs_identifier = plaintext[:-(2 * random_value_size)] secret = _derive_shared_secret(random_value1, random_value2) if hmac.compare_digest(_random_value1, random_value1): output = secret else: output = None return output
def hook(request): body = await request.read() check_signature = hmac.compare_digest( get_signature(body), request.headers.get('X-Hub-Signature', '') ) if not check_signature: return web.HTTPBadRequest() body = await request.json() headers = dict(request.headers.items()) del headers['X-Hub-Signature'] ref = get_hook_ctx(headers, body, clean=True) if ref: request.app.loop.create_task(ci(ref)) return web.json_response(ref)
def call(self, request): try: event_data = (await request.post())['mandrill_events'] except KeyError: raise HTTPBadRequest(text='"mandrill_events" not found in post data') sig_generated = base64.b64encode( hmac.new( self.app['webhook_auth_key'], msg=(self.app['mandrill_webhook_url'] + 'mandrill_events' + event_data).encode(), digestmod=hashlib.sha1 ).digest() ) sig_given = request.headers.get('X-Mandrill-Signature', '<missing>').encode() if not hmac.compare_digest(sig_generated, sig_given): raise HTTPForbidden(text='invalid signature') try: events = ujson.loads(event_data) except ValueError as e: raise HTTPBadRequest(text=f'invalid json data: {e}') await self.sender.update_mandrill_webhooks(events) return Response(text='message status updated\n')
def strings_equal(s1, s2): """ Timing-attack resistant string comparison. Normal comparison using == will short-circuit on the first mismatching character. This avoids that by scanning the whole string, though we still reveal to a timing attack whether the strings are the same length. """ try: s1 = unicodedata.normalize('NFKC', str(s1)) s2 = unicodedata.normalize('NFKC', str(s2)) except: s1 = unicodedata.normalize('NFKC', unicode(s1)) s2 = unicodedata.normalize('NFKC', unicode(s2)) return compare_digest(s1, s2)
def test_hash_only(): """Verify that hash signatures are calculated correctly.""" data = gen_test_data(100) auth = BasicAuthProvider() sig = auth.sign_report(0, BasicAuthProvider.NoKey, data) digest = sig['signature'] calced = hashlib.sha256(data).digest() assert hmac.compare_digest(calced, digest) full = auth.verify_report(0, BasicAuthProvider.NoKey, data, calced) assert full['verified'] assert full['bit_length'] == 256 short = auth.verify_report(0, BasicAuthProvider.NoKey, data, calced[:16]) assert short['verified'] assert short['bit_length'] == 128 none = auth.verify_report(0, BasicAuthProvider.NoKey, data, bytearray()) assert not none['verified']
def valid_user(cookie_str): """ Returns username after validating the cookie session data """ if cookie_str is None: return None cookie_parts = cookie_str.split("|") if len(cookie_parts) != 2: return None if hmac.compare_digest(str(cookie_parts[1]), str(cookie_hash(cookie_parts[0]))): user_query = ndb.gql("""SELECT * FROM User WHERE current_session ='%s'""" % cookie_parts[0]) current_user = user_query.get() if (current_user and datetime.datetime.now() < current_user.session_expires): current_user.session_expires = (datetime.datetime.now() + datetime.timedelta(hours=1)) current_user.put() return current_user.username else: return None
def authentification(): # Only SHA1 is supported header_signature = flask.request.headers.get('X-Hub-Signature') if header_signature is None: LOG.warning("Webhook without signature") flask.abort(403) try: sha_name, signature = header_signature.split('=') except ValueError: sha_name = None if sha_name != 'sha1': LOG.warning("Webhook signature malformed") flask.abort(403) mac = utils.compute_hmac(flask.request.data) if not hmac.compare_digest(mac, str(signature)): LOG.warning("Webhook signature invalid") flask.abort(403)
def verify_secure_value(name: str, value: Union[str, bytes], *, secret: str) -> Union[str, None]: """verify the signature if correct return the value after base64 decode else return None """ parts = tob(value).split(b"|") if len(parts) != 3: return None signature = _generate_signature(secret, name, parts[0], parts[1]) # print(signature, parts[2]) if not hmac.compare_digest(parts[2], signature): return None else: return touni(base64.b64decode(parts[0]))
def verify_signature(secret: str, signature: str, resp_body: BytesIO) -> None: """Verify HMAC-SHA1 signature of the given response body. The signature is expected to be in format ``sha1=<hex-digest>``. """ try: alg, digest = signature.lower().split('=', 1) except (ValueError, AttributeError): raise InvalidSignatureError('signature is malformed') if alg != 'sha1': raise InvalidSignatureError("expected type sha1, but got %s" % alg) computed_digest = hmac.new(secret.encode('utf-8'), # type: ignore msg=resp_body.getbuffer(), digestmod=hashlib.sha1).hexdigest() if not hmac.compare_digest(computed_digest, digest): raise InvalidSignatureError('digests do not match')
def compare_digest(a, b, _xor_bytes=_xor_bytes): left = None right = b if len(a) == len(b): left = a result = 0 if len(a) != len(b): left = b result = 1 for x, y in zip(left, right): result |= _xor_bytes(x, y) return result == 0
def validate_github_webhook(request): key = settings.GITHUB_WEBHOOK_SECRET signature = request.META.get('HTTP_X_HUB_SIGNATURE').split('=')[1] if type(key) == unicode: key = key.encode() mac = hmac.new(key, msg=request.body, digestmod=sha1) if not hmac.compare_digest(mac.hexdigest(), signature): return False return True
def check_password(self, password=None): """Check password""" return hmac.compare_digest(self.phash, crypt.crypt(password, self.phash))
def auth(secret, headers, raw_payload): received_sig = to_bytes(headers['X-Hub-Signature']) h = hmac.new(secret, raw_payload, hashlib.sha1).hexdigest() correct_sig = b'sha1=' + to_bytes(h) if not hmac.compare_digest(received_sig, correct_sig): raise WebhookAuthError()
def auth(secret, headers, raw_payload): received_token = to_bytes(headers['X-Gitlab-Token']) if not hmac.compare_digest(secret, received_token): raise WebhookAuthError()
def bytes_eq(a, b): if not isinstance(a, bytes) or not isinstance(b, bytes): raise TypeError("a and b must be bytes.") return hmac.compare_digest(a, b)
def verify(digest, digest_method, key, message): """Verify HMAC digest.""" return hmac.compare_digest( want_bytes(sign(digest_method, key, message)), want_bytes(digest))
def constant_time_compare(val1, val2): return hmac.compare_digest(force_bytes(val1), force_bytes(val2))
def auth_token(self): try: token = b64decode(self.password.translate(usersafe_encoding) + '=======') except: logging.debug('Could not decode token (maybe not a token?)') return False jid = self.username + '@' + self.domain if len(token) != 23: logging.debug('Token is too short: %d != 23 (maybe not a token?)' % len(token)) return False (version, mac, header) = unpack('> B 16s 6s', token) if version != 0: logging.debug('Wrong token version (maybe not a token?)') return False; (secretID, expiry) = unpack('> H I', header) if expiry < self.now: logging.debug('Token has expired') return False challenge = pack('> B 6s %ds' % len(jid), version, header, jid) response = hmac.new(self.secret, challenge, hashlib.sha256).digest() return hmac.compare_digest(mac, response[:16])
def constant_time_compare(val1, val2): """Return True if the two strings are equal, False otherwise.""" return hmac.compare_digest(force_bytes(val1), force_bytes(val2))
def get_redirect_url(self, pk_encoded, sig): pk = base(pk_encoded, 62, 10, string=True) short_url = None result = None try: short_url = ShortenedURL.objects.get(pk=pk) except ShortenedURL.DoesNotExist: pass valid_request = short_url and hmac.compare_digest(short_url.sig, sig) if valid_request and not short_url.is_expired: result = short_url.url elif valid_request and short_url.is_expired: result = settings.PRIVATE_SHORTENER_REDIRECT_EXPIRED_URL self.log_expired_url(short_url) elif not valid_request and short_url: result = settings.PRIVATE_SHORTENER_REDIRECT_INVALID_URL self.log_invalid_sig(short_url, sig) else: result = settings.PRIVATE_SHORTENER_REDIRECT_INVALID_URL self.log_invalid_pk(pk_encoded) self.log_result(result) if not result: raise Http404 return result
def has_permission(self, request, view): return bool( settings.PRIVATE_SHORTENER_API_SECRET_KEY is None or hmac.compare_digest( settings.PRIVATE_SHORTENER_API_SECRET_KEY, request.META.get('HTTP_AUTHORIZATION', ''), ), )
def validate_secret(header_signature, msg): with open(os.path.join(os.path.dirname(__file__), "..", "ghsecret.txt")) as f: secret = f.read().strip() if header_signature is None: logger.error("Header signature missing") return False sha_name, signature = header_signature.split('=') if sha_name != 'sha1': logger.error("Header signature not signed with sha1") return False # HMAC requires the key to be bytes, but data is string mac = hmac.new(str(secret), msg=msg, digestmod=hashlib.sha1) # Get ours vs. theirs expected = str(mac.hexdigest()) received = str(signature) # Timing attack secure comparison matches = hmac.compare_digest(expected, received) if not matches: logger.error("Header signature ({}) does not match expected ({})".format(received, expected)) return matches
def post(self, request, *args, **kwargs): secret = self.get_secret() if not secret: raise ImproperlyConfigured('GitHub webhook secret ist not defined.') if 'HTTP_X_HUB_SIGNATURE' not in request.META: return HttpResponseBadRequest('Request does not contain X-GITHUB-SIGNATURE header') if 'HTTP_X_GITHUB_EVENT' not in request.META: return HttpResponseBadRequest('Request does not contain X-GITHUB-EVENT header') digest_name, signature = request.META['HTTP_X_HUB_SIGNATURE'].split('=') if digest_name != 'sha1': return HttpResponseBadRequest('Unsupported X-HUB-SIGNATURE digest mode found: {}'.format(digest_name)) mac = hmac.new( secret.encode('utf-8'), msg=request.body, digestmod=hashlib.sha1 ) if not hmac.compare_digest(mac.hexdigest(), signature): return HttpResponseBadRequest('Invalid X-HUB-SIGNATURE header found') event = request.META['HTTP_X_GITHUB_EVENT'] if event not in self.get_allowed_events(): return HttpResponseBadRequest('Unsupported X-GITHUB-EVENT header found: {}'.format(event)) handler = getattr(self, event, None) if not handler: return HttpResponseBadRequest('Unsupported X-GITHUB-EVENT header found: {}'.format(event)) payload = json.loads(request.body.decode('utf-8')) response = handler(payload, request, *args, **kwargs) return JsonResponse(response)
def compare(a, b): """ Compares two strings and not vulnerable to timing attacks """ if HAVE_COMPARE_DIGEST: return hmac.compare_digest(a, b) result = len(a) ^ len(b) for i in xrange(len(b)): result |= ord(a[i % len(a)]) ^ ord(b[i]) return result == 0
def webhook_verify_signature(self, data, signature): """Verify the content with signature :param data: Request data to be verified :param signature: The signature of data :type signature: str :return: If the content is verified :rtype: bool """ h = hmac.new( self.webhooks_secret.encode('utf-8'), data, hashlib.sha1 ) return hmac.compare_digest(h.hexdigest(), signature)
def m4_receive_srp_verify_response(self, parsed_ktlvs: Dict[str, bytes]) -> None: """Verify accessory's proof.""" if from_bytes(parsed_ktlvs['kTLVType_State'], False) != 4: raise ValueError( "Received wrong message for M4 {}".format(parsed_ktlvs)) self.M2 = from_bytes(parsed_ktlvs['kTLVType_Proof']) M2_calc = H(self.A, self.M1, self.K) if not compare_digest( to_bytes(M2_calc), parsed_ktlvs['kTLVType_Proof']): raise ValueError("Authentication failed - invalid prood received.")
def check_password(self, password=None): """ Check password """ return hmac.compare_digest(self.phash, crypt.crypt(password, self.phash))
def compare_digest(s1, s2): return s1 == s2 # websocket supported version.
def _validate(headers, key, subprotocols): subproto = None for k, v in _HEADERS_TO_CHECK.items(): r = headers.get(k, None) if not r: return False, None r = r.lower() if v != r: return False, None if subprotocols: subproto = headers.get("sec-websocket-protocol", None).lower() if not subproto or subproto not in [s.lower() for s in subprotocols]: error("Invalid subprotocol: " + str(subprotocols)) return False, None result = headers.get("sec-websocket-accept", None) if not result: return False, None result = result.lower() if isinstance(result, six.text_type): result = result.encode('utf-8') value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8') hashed = base64encode(hashlib.sha1(value).digest()).strip().lower() success = compare_digest(hashed, result) if success: return True, subproto else: return False, None
def compare_digest(a, b): """ ** From Django source ** Run a constant time comparison against two strings Returns true if a and b are equal. a and b must both be the same length, or False is returned immediately """ if len(a) != len(b): return False result = 0 for ch_a, ch_b in zip(a, b): result |= ord(ch_a) ^ ord(ch_b) return result == 0