我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用hmac.new()。
def add_credentials_options(self): """Add credentials to the Options dictionary (if necessary).""" api_username = self.configuration.username api_key = self.configuration.password self.options['api_username'] = api_username if self.configuration.secure_auth == True: timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') uri = '/' + self.configuration.sub_url + ('/' if self.domain_name.strip()=='' else '/' + self.domain_name + '/') + self.service_name self.options['timestamp'] = timestamp params = ''.join([api_username, timestamp, uri]) self.options['signature'] = hmac.new(api_key, params, digestmod=hashlib.sha1).hexdigest() else: self.options['api_key'] = api_key
def check_signature(header_signature, request_body): if not header_signature: raise ValueError('No X-Hub-Signature header.') algorithm, signature_digest = header_signature.split('=') if algorithm != 'sha1': raise ValueError('Unsupported digest algorithm {}.'.format(algorithm)) body_digest = hmac.new( webhook_secret(), msg=request_body, digestmod=hashlib.sha1).hexdigest() if not hmac.compare_digest(body_digest, signature_digest): raise ValueError('Body digest did not match signature digest') return True
def request(self, method, request_uri, headers, content): """Modify the request headers""" keys = _get_end2end_headers(headers) keylist = "".join(["%s " % k for k in keys]) headers_val = "".join([headers[k] for k in keys]) created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime()) cnonce = _cnonce() request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val) request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower() headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % ( self.credentials[0], self.challenge['realm'], self.challenge['snonce'], cnonce, request_uri, created, request_digest, keylist)
def generate_token(key, user_id, action_id='', when=None): """Generates a URL-safe token for the given user, action, time tuple. Args: key: secret key to use. user_id: the user ID of the authenticated user. action_id: a string identifier of the action they requested authorization for. when: the time in seconds since the epoch at which the user was authorized for this action. If not set the current time is used. Returns: A string XSRF protection token. """ digester = hmac.new(_to_bytes(key, encoding='utf-8')) digester.update(_to_bytes(str(user_id), encoding='utf-8')) digester.update(DELIMITER) digester.update(_to_bytes(action_id, encoding='utf-8')) digester.update(DELIMITER) when = _to_bytes(str(when or int(time.time())), encoding='utf-8') digester.update(when) digest = digester.digest() token = base64.urlsafe_b64encode(digest + DELIMITER + when) return token
def verify_request_signature(func): @wraps(func) def decorated(*args, **kwargs): signature = request.headers.get('x-hub-signature', None) if signature: elements = signature.split('=') method = elements[0] signature_hash = elements[1] expected_hash = hmac.new(APP_SECRET, msg=request.get_data(), digestmod=method).hexdigest() if signature_hash != expected_hash: LOGGER.error('Signature was invalid') return make_response('', 403) else: LOGGER.error('Could not validate the signature') return func(*args, **kwargs) return decorated
def derive_key(self): """This method is called to derive the key. If you're unhappy with the default key derivation choices you can override them here. Keep in mind that the key derivation in itsdangerous is not intended to be used as a security method to make a complex key out of a short password. Instead you should use large random secret keys. """ salt = want_bytes(self.salt) if self.key_derivation == 'concat': return self.digest_method(salt + self.secret_key).digest() elif self.key_derivation == 'django-concat': return self.digest_method(salt + b'signer' + self.secret_key).digest() elif self.key_derivation == 'hmac': mac = hmac.new(self.secret_key, digestmod=self.digest_method) mac.update(salt) return mac.digest() elif self.key_derivation == 'none': return self.secret_key else: raise TypeError('Unknown key derivation method')
def getHMACFunc(key, hex=True): """Return a function that computes the HMAC of its input using the **key**. :param bool hex: If True, the output of the function will be hex-encoded. :rtype: callable :returns: A function which can be uses to generate HMACs. """ h = hmac.new(key, digestmod=DIGESTMOD) def hmac_fn(value): h_tmp = h.copy() h_tmp.update(value) if hex: return h_tmp.hexdigest() else: return h_tmp.digest() return hmac_fn
def fetch_request_token(self, oauth_request): """Processes a request_token request and returns the request token on success. """ try: # Get the request token for authorization. token = self._get_token(oauth_request, 'request') except OAuthError: # No token required for the initial token request. version = self._get_version(oauth_request) consumer = self._get_consumer(oauth_request) try: callback = self.get_callback(oauth_request) except OAuthError: callback = None # 1.0, no callback specified. self._check_signature(oauth_request, consumer, None) # Fetch a new token. token = self.data_store.fetch_request_token(consumer, callback) return token
def client_udp_pre_encrypt(self, buf): if self.user_key is None: if b':' in to_bytes(self.server_info.protocol_param): try: items = to_bytes(self.server_info.protocol_param).split(':') self.user_key = self.hashfunc(items[1]).digest() self.user_id = struct.pack('<I', int(items[0])) except: pass if self.user_key is None: self.user_id = os.urandom(4) self.user_key = self.server_info.key authdata = os.urandom(3) mac_key = self.server_info.key md5data = hmac.new(mac_key, authdata, self.hashfunc).digest() uid = struct.unpack('<I', self.user_id)[0] ^ struct.unpack('<I', md5data[:4])[0] uid = struct.pack('<I', uid) rand_len = self.udp_rnd_data_len(md5data, self.random_client) encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(self.user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4') out_buf = encryptor.encrypt(buf) buf = out_buf + os.urandom(rand_len) + authdata + uid return buf + hmac.new(self.user_key, buf, self.hashfunc).digest()[:1]
def server_udp_pre_encrypt(self, buf, uid): if uid in self.server_info.users: user_key = self.server_info.users[uid] else: uid = None if not self.server_info.users: user_key = self.server_info.key else: user_key = self.server_info.recv_iv authdata = os.urandom(7) mac_key = self.server_info.key md5data = hmac.new(mac_key, authdata, self.hashfunc).digest() rand_len = self.udp_rnd_data_len(md5data, self.random_server) encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4') out_buf = encryptor.encrypt(buf) buf = out_buf + os.urandom(rand_len) + authdata return buf + hmac.new(user_key, buf, self.hashfunc).digest()[:1]
def server_udp_post_decrypt(self, buf): mac_key = self.server_info.key md5data = hmac.new(mac_key, buf[-8:-5], self.hashfunc).digest() uid = struct.unpack('<I', buf[-5:-1])[0] ^ struct.unpack('<I', md5data[:4])[0] uid = struct.pack('<I', uid) if uid in self.server_info.users: user_key = self.server_info.users[uid] else: uid = None if not self.server_info.users: user_key = self.server_info.key else: user_key = self.server_info.recv_iv if hmac.new(user_key, buf[:-1], self.hashfunc).digest()[:1] != buf[-1:]: return (b'', None) rand_len = self.udp_rnd_data_len(md5data, self.random_client) encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4') out_buf = encryptor.decrypt(buf[:-8 - rand_len]) return (out_buf, uid)
def _oauth_get_user_future(self, access_token, callback): """Subclasses must override this to get basic information about the user. Should return a `.Future` whose result is a dictionary containing information about the user, which may have been retrieved by using ``access_token`` to make a request to the service. The access token will be added to the returned dictionary to make the result of `get_authenticated_user`. For backwards compatibility, the callback-based ``_oauth_get_user`` method is also supported. """ # By default, call the old-style _oauth_get_user, but new code # should override this method instead. self._oauth_get_user(access_token, callback)
def _oauth_signature(consumer_token, method, url, parameters={}, token=None): """Calculates the HMAC-SHA1 OAuth signature for the given request. See http://oauth.net/core/1.0/#signing_process """ parts = urlparse.urlparse(url) scheme, netloc, path = parts[:3] normalized_url = scheme.lower() + "://" + netloc.lower() + path base_elems = [] base_elems.append(method.upper()) base_elems.append(normalized_url) base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v))) for k, v in sorted(parameters.items()))) base_string = "&".join(_oauth_escape(e) for e in base_elems) key_elems = [escape.utf8(consumer_token["secret"])] key_elems.append(escape.utf8(token["secret"] if token else "")) key = b"&".join(key_elems) hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1) return binascii.b2a_base64(hash.digest())[:-1]
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None): """Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request. See http://oauth.net/core/1.0a/#signing_process """ parts = urlparse.urlparse(url) scheme, netloc, path = parts[:3] normalized_url = scheme.lower() + "://" + netloc.lower() + path base_elems = [] base_elems.append(method.upper()) base_elems.append(normalized_url) base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v))) for k, v in sorted(parameters.items()))) base_string = "&".join(_oauth_escape(e) for e in base_elems) key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))] key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else "")) key = b"&".join(key_elems) hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1) return binascii.b2a_base64(hash.digest())[:-1]
def create_template_loader(self, template_path): """Returns a new template loader for the given path. May be overridden by subclasses. By default returns a directory-based loader on the given path, using the ``autoescape`` and ``template_whitespace`` application settings. If a ``template_loader`` application setting is supplied, uses that instead. """ settings = self.application.settings if "template_loader" in settings: return settings["template_loader"] kwargs = {} if "autoescape" in settings: # autoescape=None means "no escaping", so we have to be sure # to only pass this kwarg if the user asked for it. kwargs["autoescape"] = settings["autoescape"] if "template_whitespace" in settings: kwargs["whitespace"] = settings["template_whitespace"] return template.Loader(template_path, **kwargs)
def _get_raw_xsrf_token(self): """Read or generate the xsrf token in its raw form. The raw_xsrf_token is a tuple containing: * version: the version of the cookie from which this token was read, or None if we generated a new token in this request. * token: the raw token data; random (non-ascii) bytes. * timestamp: the time this token was generated (will not be accurate for version 1 cookies) """ if not hasattr(self, '_raw_xsrf_token'): cookie = self.get_cookie("_xsrf") if cookie: version, token, timestamp = self._decode_xsrf_token(cookie) else: version, token, timestamp = None, None, None if token is None: version = None token = os.urandom(16) timestamp = time.time() self._raw_xsrf_token = (version, token, timestamp) return self._raw_xsrf_token
def salted_hmac(key_salt, value, secret=None): """ Returns the HMAC-SHA1 of 'value', using a key generated from key_salt and a secret (which defaults to settings.SECRET_KEY). A different key_salt should be passed in for every application of HMAC. """ if secret is None: secret = settings.SECRET_KEY key_salt = force_bytes(key_salt) secret = force_bytes(secret) # We need to generate a derived key from our base key. We can do this by # passing the key_salt and our base key through a pseudo-random function and # SHA1 works nicely. key = hashlib.sha1(key_salt + secret).digest() # If len(key_salt + secret) > sha_constructor().block_size, the above # line is redundant and could be replaced by key = key_salt + secret, since # the hmac module does the same thing for keys longer than the block size. # However, we need to ensure that we *always* do this. return hmac.new(key, msg=force_bytes(value), digestmod=hashlib.sha1)
def store_mfa(user, seed, region, account): SSM_CLIENT = boto3.client('ssm') KMS_CLIENT = boto3.client('kms') response = KMS_CLIENT.describe_key( KeyId='alias/MFAUser', ) keyArn = response['KeyMetadata']['Arn'] try: response = SSM_CLIENT.put_parameter( Name='mfa-' + user, Description='MFA token seed', Value=seed, Type='SecureString', KeyId=keyArn, Overwrite=True ) mfa_store_policy(user, region, account) print("Token stored in Parameter Store") except Exception as e: print("Failed to store seed. You will need to retrieve it from the used log DDB or create a new token manually.") response = "Fail" return response
def deleteUser(userName, SN): try: response = client.deactivate_mfa_device( UserName=userName, SerialNumber=SN ) print("MFA device deactivated, trying to delete device.") except: print("Unable to deactivate MFA token. Could be that it's not created.") try: response = client.delete_virtual_mfa_device( SerialNumber=SN ) print("MFA device deleted, trying to delete new user.") except: print("Unable to delete MFA token. Could be that it's not created.") try: response = client.delete_user( UserName=userName ) print("User deleted") except: print("Unable to delete user: " + userName) return True
def generate_token(seed): """Summary Args: seed (TYPE): Description Returns: TYPE: Description """ seed = base64.b32decode(seed, True) hmacHash = hmac.new( seed, struct.pack( ">Q", int( time.time() // 30)), hashlib.sha1).digest() hashOffset = ord(hmacHash[19]) & 0xf token = (struct.unpack( ">I", hmacHash[hashOffset:hashOffset + 4])[0] & 0x7fffffff) % 10 ** 6 return token
def build_hmac_sha1(pkt, pw = '\0' * 20, ip4l = [], ip6l = []): if not pkt.haslayer(CARP): return None p = pkt[CARP] h = hmac.new(pw, digestmod = hashlib.sha1) # XXX: this is a dirty hack. it needs to pack version and type into a single 8bit field h.update('\x21') # XXX: mac addy if different from special link layer. comes before vhid h.update(struct.pack('!B', p.vhid)) sl = [] for i in ip4l: # sort ips from smallest to largest sl.append(inet_aton(i)) sl.sort() for i in sl: h.update(i) # XXX: do ip6l sorting return h.digest()
def post(self): """ Creates new post and redirect to new post page. """ if not self.user: return self.redirect('/blog') subject = self.request.get('subject') content = self.request.get('content') if subject and content: p = Post(parent=blog_key(), user_id=self.user.key().id(), subject=subject, content=content) p.put() self.redirect('/blog/%s' % str(p.key().id())) else: error = "subject and content, please!" self.render("newpost.html", subject=subject, content=content, error=error)
def gen_user_breadcrumb(size): """ Used in comments posting. :param size: :return: """ key = 'iN4$aGr0m' dt = int(time.time() * 1000) # typing time elapsed time_elapsed = randint(500, 1500) + size * randint(500, 1500) text_change_event_count = max(1, size / randint(3, 5)) data = '{size!s} {elapsed!s} {count!s} {dt!s}'.format(**{ 'size': size, 'elapsed': time_elapsed, 'count': text_change_event_count, 'dt': dt }) return '{0!s}\n{1!s}\n'.format( base64.b64encode(hmac.new(key.encode('ascii'), data.encode('ascii'), digestmod=hashlib.sha256).digest()), base64.b64encode(data.encode('ascii')))
def kciCloudHelper(iCloudKey): #this function is tailored to keychaindump. Takes an iCloud key, and returns tokens msg = base64.b64decode(iCloudKey) key = "t9s\"lx^awe.580Gj%'ld+0LG<#9xa?>vb)-fkwb92[}" hashed = hmac.new(key, msg, digestmod=hashlib.md5).digest() hexedKey = binascii.hexlify(hashed) IV = 16 * '0' mme_token_file = glob("/Users/%s/Library/Application Support/iCloud/Accounts/*" % get_bella_user()) #this doesnt need to be globber bc only current user's info can be decrypted for x in mme_token_file: try: int(x.split("/")[-1]) mme_token_file = x except ValueError: continue send_msg("\t%sDecrypting token plist\n\t [%s]\n" % (blue_star, mme_token_file), False) decryptedBinary = subprocess.check_output("openssl enc -d -aes-128-cbc -iv '%s' -K %s < '%s'" % (IV, hexedKey, mme_token_file), shell=True) from Foundation import NSData, NSPropertyListSerialization binToPlist = NSData.dataWithBytes_length_(decryptedBinary, len(decryptedBinary)) token_plist = NSPropertyListSerialization.propertyListWithData_options_format_error_(binToPlist, 0, None, None)[0] tokz = "[%s | %s]\n" % (token_plist["appleAccountInfo"]["primaryEmail"], token_plist["appleAccountInfo"]["fullName"]) tokz += "%s:%s\n" % (token_plist["appleAccountInfo"]["dsPrsID"], token_plist["tokens"]["mmeAuthToken"]) return tokz
def auth_sub_string_from_body(http_body): """Extracts the AuthSub token from an HTTP body string. Used to find the new session token after making a request to upgrade a single use AuthSub token. Args: http_body: str The repsonse from the server which contains the AuthSub key. For example, this function would find the new session token from the server's response to an upgrade token request. Returns: The raw token value string to use in an AuthSubToken object. """ for response_line in http_body.splitlines(): if response_line.startswith('Token='): # Strip off Token= and return the token value string. return response_line[6:] return None
def generate_request_for_access_token( request_token, auth_server_url=ACCESS_TOKEN_URL): """Creates a request to ask the OAuth server for an access token. Requires a request token which the user has authorized. See the documentation on OAuth with Google Data for more details: http://code.google.com/apis/accounts/docs/OAuth.html#AccessToken Args: request_token: An OAuthHmacToken or OAuthRsaToken which the user has approved using their browser. auth_server_url: (optional) The URL at which the OAuth access token is requested. Defaults to https://www.google.com/accounts/OAuthGetAccessToken Returns: A new HttpRequest object which can be sent to the OAuth server to request an OAuth Access Token. """ http_request = atom.http_core.HttpRequest(auth_server_url, 'POST') http_request.headers['Content-Length'] = '0' return request_token.modify_request(http_request)
def upgrade_to_access_token(request_token, server_response_body): """Extracts access token information from response to an upgrade request. Once the server has responded with the new token info for the OAuth access token, this method modifies the request_token to set and unset necessary fields to create valid OAuth authorization headers for requests. Args: request_token: An OAuth token which this function modifies to allow it to be used as an access token. server_response_body: str The server's response to an OAuthAuthorizeToken request. This should contain the new token and token_secret which are used to generate the signature and parameters of the Authorization header in subsequent requests to Google Data APIs. Returns: The same token object which was passed in. """ token, token_secret = oauth_token_info_from_body(server_response_body) request_token.token = token request_token.token_secret = token_secret request_token.auth_state = ACCESS_TOKEN request_token.next = None request_token.verifier = None return request_token
def generate_token(key, user_id, action_id='', when=None): """Generates a URL-safe token for the given user, action, time tuple. Args: key: secret key to use. user_id: the user ID of the authenticated user. action_id: a string identifier of the action they requested authorization for. when: the time in seconds since the epoch at which the user was authorized for this action. If not set the current time is used. Returns: A string XSRF protection token. """ digester = hmac.new(_helpers._to_bytes(key, encoding='utf-8')) digester.update(_helpers._to_bytes(str(user_id), encoding='utf-8')) digester.update(DELIMITER) digester.update(_helpers._to_bytes(action_id, encoding='utf-8')) digester.update(DELIMITER) when = _helpers._to_bytes(str(when or int(time.time())), encoding='utf-8') digester.update(when) digest = digester.digest() token = base64.urlsafe_b64encode(digest + DELIMITER + when) return token
def generate_csrf_token(self, csrf_context): if self.SECRET_KEY is None: raise Exception('must set SECRET_KEY in a subclass of this form for it to work') if csrf_context is None: raise TypeError('Must provide a session-like object as csrf context') session = getattr(csrf_context, 'session', csrf_context) if 'csrf' not in session: session['csrf'] = sha1(os.urandom(64)).hexdigest() self.csrf_token.csrf_key = session['csrf'] if self.TIME_LIMIT: expires = (datetime.now() + self.TIME_LIMIT).strftime(self.TIME_FORMAT) csrf_build = '%s%s' % (session['csrf'], expires) else: expires = '' csrf_build = session['csrf'] hmac_csrf = hmac.new(self.SECRET_KEY, csrf_build.encode('utf8'), digestmod=sha1) return '%s##%s' % (expires, hmac_csrf.hexdigest())
def validate_csrf_token(self, field): if not field.data or '##' not in field.data: raise ValidationError(field.gettext('CSRF token missing')) expires, hmac_csrf = field.data.split('##') check_val = (field.csrf_key + expires).encode('utf8') hmac_compare = hmac.new(self.SECRET_KEY, check_val, digestmod=sha1) if hmac_compare.hexdigest() != hmac_csrf: raise ValidationError(field.gettext('CSRF failed')) if self.TIME_LIMIT: now_formatted = datetime.now().strftime(self.TIME_FORMAT) if now_formatted > expires: raise ValidationError(field.gettext('CSRF token expired'))
def validate_csrf_token(self, form, field): meta = self.form_meta if not field.data or '##' not in field.data: raise ValidationError(field.gettext('CSRF token missing')) expires, hmac_csrf = field.data.split('##', 1) check_val = (self.session['csrf'] + expires).encode('utf8') hmac_compare = hmac.new(meta.csrf_secret, check_val, digestmod=sha1) if hmac_compare.hexdigest() != hmac_csrf: raise ValidationError(field.gettext('CSRF failed')) if self.time_limit: now_formatted = self.now().strftime(self.TIME_FORMAT) if now_formatted > expires: raise ValidationError(field.gettext('CSRF token expired'))
def generate_csrf_token(self, csrf_token_field): meta = self.form_meta if meta.csrf_secret is None: raise Exception('must set `csrf_secret` on class Meta for SessionCSRF to work') if meta.csrf_context is None: raise TypeError('Must provide a session-like object as csrf context') session = self.session if 'csrf' not in session: session['csrf'] = sha1(os.urandom(64)).hexdigest() if self.time_limit: expires = (self.now() + self.time_limit).strftime(self.TIME_FORMAT) csrf_build = '%s%s' % (session['csrf'], expires) else: expires = '' csrf_build = session['csrf'] hmac_csrf = hmac.new(meta.csrf_secret, csrf_build.encode('utf8'), digestmod=sha1) return '%s##%s' % (expires, hmac_csrf.hexdigest())