我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用hashlib.sha256()。
def solve(x): cry = x clear = base58.b58decode(cry) li = list() for i in clear: li.append(ord(i)) ori = clear[:-4] #chk = clear[-4:] rechk = hashlib.sha256(ori).digest() rechk = hashlib.sha256(rechk).digest() #a = list() #for i in rechk: # a.append(ord(i)) checksum = rechk[:4] final = ori+checksum return base58.b58encode(final)
def GetIntegrity(self, data): """Calculate the integirty value of given data using remote peers hash""" if self.hash_type == None: return None elif self.hash_type == 0: # SHA-1 return hashlib.sha1(data).digest() elif self.hash_type == 1: # SHA-224 return hashlib.sha224(data).digest() elif self.hash_type == 2: # SHA-256 return hashlib.sha256(data).digest() elif self.hash_type == 3: # SHA-384 return hashlib.sha384(data).digest() elif self.hash_type == 4: # SHA-512 return hashlib.sha512(data).digest() else: return None
def get_query_hash(uri, method, query_params=None): # see # https://developer.atlassian.com/static/connect/docs/latest/concepts/understanding-jwt.html#qsh uri = uri.rstrip('/') method = method.upper() if query_params is None: query_params = {} sorted_query = [] for k, v in sorted(query_params.items()): # don't include jwt query param if k != 'jwt': if isinstance(v, list): param_val = [percent_encode(val) for val in v].join(',') else: param_val = percent_encode(v) sorted_query.append('%s=%s' % (percent_encode(k), param_val)) query_string = '%s&%s&%s' % (method, uri, '&'.join(sorted_query)) return hashlib.sha256(query_string.encode('utf8')).hexdigest()
def hash_Dk(s): ''' securely hashes a string s to a length k (= 512) string with 64(kappa) many +-1 's. This can be extended to other lengths and kappa's. it would be bitwise operations on the binary value of a suitable hash ''' oparray = [0] * 512 num = bin(int(hashlib.sha256(s).hexdigest(),16))[2:] i = 0 j = 0 while(i < 256): shift8 = int(num[i+1 : i+4], 2) if(num[i] == '0'): oparray[j + shift8] = 1 else: oparray[j + shift8] = -1 j += 8 i += 4 return oparray
def check_login(login, password): if login == '' or password == '': return False else: g.db = connect_db() cur = g.db.execute('SELECT salt FROM users WHERE login = "' + login + '"') salt = cur.fetchone() if salt: salted = password + salt[0] else: #unsalted password or invalid login g.db.close() return False hashed = sha256(salted.encode()).hexdigest() cur = g.db.execute('SELECT id FROM users WHERE login = "' + login + '" AND password = "' + hashed + '"') uid = cur.fetchone() g.db.close() if uid: return uid[0] else: return False ## # Change password #
def get_random_string(length=12, allowed_chars='abcdefghijklmnopqrstuvwxyz' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'): """ Returns a securely generated random string. The default length of 12 with the a-z, A-Z, 0-9 character set returns a 71-bit value. log_2((26+26+10)^12) =~ 71 bits """ if not using_sysrandom: # This is ugly, and a hack, but it makes things better than # the alternative of predictability. This re-seeds the PRNG # using a value that is hard for an attacker to predict, every # time a random string is required. This may change the # properties of the chosen random sequence slightly, but this # is better than absolute predictability. random.seed( hashlib.sha256( ("%s%s" % (random.getstate(), time.time())).encode('utf-8') ).digest()) return ''.join(random.choice(allowed_chars) for i in range(length))
def set_login_id(self): """Session id Creator Creates a session id and writes it to the database. Returns: str. The id of the session. """ _logger.debug("Setting login token.") session_id = hashlib.sha256(str.encode(str(time.time()))).hexdigest() _logger.debug("Exipres in 2 hours") expiry = int(time.time()) + 7200 #All cookies expire in 2 hours _logger.debug("calling DB to add login token to database") self.db("add_login_id", session_id, expiry) _logger.debug("Session ID is {0}".format(session_id)) return session_id
def _make_flow(request, scopes, return_url=None): """Creates a Web Server Flow""" # Generate a CSRF token to prevent malicious requests. csrf_token = hashlib.sha256(os.urandom(1024)).hexdigest() request.session[_CSRF_KEY] = csrf_token state = json.dumps({ 'csrf_token': csrf_token, 'return_url': return_url, }) flow = client.OAuth2WebServerFlow( client_id=django_util.oauth2_settings.client_id, client_secret=django_util.oauth2_settings.client_secret, scope=scopes, state=state, redirect_uri=request.build_absolute_uri( urlresolvers.reverse("google_oauth:callback"))) flow_key = _FLOW_KEY.format(csrf_token) request.session[flow_key] = pickle.dumps(flow) return flow
def setUpClass(cls): cls.__repodir = tempfile.TemporaryDirectory() fn = os.path.join(cls.__repodir.name, "test.txt") cls.url = "file://" + fn with open(fn, "w") as f: f.write("Hello world!") with open(fn, "rb") as f: d = hashlib.sha1() d.update(f.read()) cls.urlSha1 = asHexStr(d.digest()) with open(fn, "rb") as f: d = hashlib.sha256() d.update(f.read()) cls.urlSha256 = asHexStr(d.digest())
def newcursor(self, dictcursor=False): ''' This creates a DB cursor for the current DB connection using a randomly generated handle. Returns a tuple with cursor and handle. dictcursor = True -> use a cursor where each returned row can be addressed as a dictionary by column name ''' handle = hashlib.sha256(os.urandom(12)).hexdigest() if dictcursor: self.cursors[handle] = self.connection.cursor( cursor_factory=psycopg2.extras.DictCursor ) else: self.cursors[handle] = self.connection.cursor() return (self.cursors[handle], handle)
def generate_keys(d, n): ''' Prints out the 6.x and 7.x keys generated by using an exponent/modulus.''' # Generate ASN.1 data asn1 = asn1_prefix + hashlib.sha256(binascii.unhexlify(keydata)).hexdigest() asn1 = int(asn1, 16) if type(d) == str: d = int(d, 16) if type(n) == str: n = int(n, 16) # Exponentiate it. keys = hashlib.sha256(binascii.unhexlify('%0512X' % pow(asn1, d, n))).hexdigest().upper() print('6.X 0x2F KeyY: %s' % keys[:0x20]) print('7.x 0x25 KeyX: %s' % keys[0x20:])
def string_2_int(string): """Return the an integer from a string. The string is hashed, converted to a base36 int, and the modulo of 10240 is returned. :param string: string to retrieve an int from :type string: ``str`` :returns: ``int`` """ # Try to encode utf-8 else pass try: string = string.encode('utf-8') except AttributeError: pass hashed_name = hashlib.sha256(string).hexdigest() return int(hashed_name, 36) % 10240
def package(): sent_package = {} sent_package['get_requests'] = get_requests def has_requests(project, branch): return len(get_requests(project, branch)) > 0 sent_package['has_requests'] = has_requests sent_package['get_log_diff'] = get_log_diff sent_package['last_modified'] = last_modified sent_package['get_branch_by_name'] = get_branch_by_name sent_package['hash'] = lambda x: hashlib.sha256(x).hexdigest() sent_package['_'] = _ sent_package['url_encode'] = lambda x: urllib.quote(x, safe='') sent_package['current_user'] = current_user sent_package['floor'] = math.floor sent_package['len'] = len sent_package['getattr'] = getattr sent_package['commit_diff'] = commit_diff return sent_package
def _calculate_tx_hash(self): """ Calculates sha-256 hash of transaction (source, destination, amount, timestamp, signature) :return: sha-256 hash :rtype: str """ data = { "source": self._source, "destination": self._destination, "amount": self._amount, "fee": self._fee, "timestamp": self._timestamp, "signature": self._signature } data_json = json.dumps(data, sort_keys=True) hash_object = hashlib.sha256(data_json) return hash_object.hexdigest()
def __call__(self): # late import to work around circular dependency from keystone_utils import ( determine_ports, update_hash_from_path, ) ssl_paths = [CA_CERT_PATH, self.ssl_dir] self.external_ports = determine_ports() before = hashlib.sha256() for path in ssl_paths: update_hash_from_path(before, path) ret = super(ApacheSSLContext, self).__call__() after = hashlib.sha256() for path in ssl_paths: update_hash_from_path(after, path) # Ensure that apache2 is restarted if these change if before.hexdigest() != after.hexdigest(): service_restart('apache2') return ret
def test_ssh_port_forwarding(master): ''' Test SSH port forwarding feature. PR: https://github.com/saltstack/salt/pull/38021 ''' msg = hashlib.sha256(str(time.time())).hexdigest() nc = "/salt-toaster/tests/scripts/netsend.sh" of = "/tmp/socket-8888.txt" loc_port = 8888 rem_port = 9999 master['container'].run("/salt-toaster/tests/scripts/socket_server.py {lp} {of}".format(lp=loc_port, of=of)) master.salt_ssh("--remote-port-forwards={rp}:127.0.0.1:{lp} cmd.run '{nc} {msg} {rp}'".format( nc=nc, msg=msg, lp=loc_port, rp=rem_port) ) assert master['container'].run("cat {}".format(of)).strip() == msg
def __init__(self, config_file, diffid_to_blobsum, blobsum_to_unzipped, blobsum_to_zipped, blobsum_to_legacy): self._config = config_file self._blobsum_to_unzipped = blobsum_to_unzipped self._blobsum_to_zipped = blobsum_to_zipped self._blobsum_to_legacy = blobsum_to_legacy config = json.loads(self._config) content = self.config_file().encode('utf-8') self._manifest = json.dumps({ 'schemaVersion': 2, 'mediaType': docker_http.MANIFEST_SCHEMA2_MIME, 'config': { 'mediaType': docker_http.CONFIG_JSON_MIME, 'size': len(content), 'digest': 'sha256:' + hashlib.sha256(content).hexdigest() }, 'layers': [ { 'mediaType': docker_http.LAYER_MIME, 'size': self.blob_size(diffid_to_blobsum[diff_id]), 'digest': diffid_to_blobsum[diff_id] } for diff_id in config['rootfs']['diff_ids'] ] }, sort_keys=True)
def authorize_travis(f): @functools.wraps(f) def wrapper(*args, **kwargs): if 'payload' not in request.form: abort(requests.codes.bad, 'Expected a payload field with the json payload') payload = json.loads(request.form.get('payload')) token = request.headers.get('Authorization', None) repository = payload['repository'] logging.info('Handling notification for repo: {0}/{1} commit {2}'.format( repository['owner_name'], repository['name'], payload['commit'])) if not token: abort(requests.codes.forbidden, 'A token is required') repository = '{0}/{1}'.format(payload['repository']['owner_name'], payload['repository']['name']) expected_token = sha256((repository + TRAVIS_TOKEN).encode('utf-8')).hexdigest() if token != expected_token: abort(requests.codes.unauthorized, 'Invalid token') return f(payload, *args, **kwargs) return wrapper
def _auth(self, password): auth_payload = {"request-type": "GetAuthRequired", "message-id": str(self.id)} self.id += 1 self.ws.send(json.dumps(auth_payload)) result = json.loads(self.ws.recv()) if result['authRequired']: secret = base64.b64encode(hashlib.sha256((password + result['salt']).encode('utf-8')).digest()) auth = base64.b64encode(hashlib.sha256(secret + result['challenge'].encode('utf-8')).digest()).decode('utf-8') auth_payload = {"request-type": "Authenticate", "message-id": str(self.id), "auth": auth} self.id += 1 self.ws.send(json.dumps(auth_payload)) result = json.loads(self.ws.recv()) if result['status'] != 'ok': raise exceptions.ConnectionFailure(result['error']) pass
def add_account(self, name, secret_code, image): """ Add an account to accounts table :param name: (str) account name :param secret_code: (str) ASCII Secret code :param image: image path or icon name :return: """ encrypted_secret = sha256(secret_code.encode('utf-8')).hexdigest() t = (name, encrypted_secret, image,) query = "INSERT INTO accounts (name, secret_code, image) VALUES (?, ?, ?)" try: GK.create_sync("org.gnome.Authenticator", None) attr = GK.Attribute.list_new() GK.Attribute.list_append_string(attr, 'id', encrypted_secret) GK.Attribute.list_append_string(attr, 'secret_code', secret_code) GK.item_create_sync("org.gnome.Authenticator", GK.ItemType.GENERIC_SECRET, repr(encrypted_secret), attr, secret_code, False) self.conn.execute(query, t) self.conn.commit() uid = self.get_latest_id() return [uid, name, encrypted_secret, image] except Exception as e: logging.error("SQL: Couldn't add a new account : %s ", str(e))
def on_unlock(self, *args): """ Password check and unlock """ password_entry = self.builder.get_object("passwordEntry") typed_pass = password_entry.get_text() ecrypted_pass = sha256(typed_pass.encode("utf-8")).hexdigest() if (settings.compare_password(typed_pass) or settings.get_password() == typed_pass == ""): password_entry.set_icon_from_icon_name( Gtk.EntryIconPosition.SECONDARY, None) settings.set_is_locked(False) password_entry.set_text("") else: password_entry.set_icon_from_icon_name( Gtk.EntryIconPosition.SECONDARY, "dialog-error-symbolic")
def get_random_string(length=12, allowed_chars='abcdefghijklmnopqrstuvwxyz' 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'): """ Returns a securely generated random string. The default length of 12 with the a-z, A-Z, 0-9 character set returns a 71-bit value. log_2((26+26+10)^12) =~ 71 bits """ if not using_sysrandom: # This is ugly, and a hack, but it makes things better than # the alternative of predictability. This re-seeds the PRNG # using a value that is hard for an attacker to predict, every # time a random string is required. This may change the # properties of the chosen random sequence slightly, but this # is better than absolute predictability. random.seed( hashlib.sha256( ("%s%s%s" % ( random.getstate(), time.time(), settings.SECRET_KEY)).encode('utf-8') ).digest()) return ''.join(random.choice(allowed_chars) for i in range(length))
def gen_user_breadcrumb(size): """ Used in comments posting. :param size: :return: """ key = 'iN4$aGr0m' dt = int(time.time() * 1000) # typing time elapsed time_elapsed = randint(500, 1500) + size * randint(500, 1500) text_change_event_count = max(1, size / randint(3, 5)) data = '{size!s} {elapsed!s} {count!s} {dt!s}'.format(**{ 'size': size, 'elapsed': time_elapsed, 'count': text_change_event_count, 'dt': dt }) return '{0!s}\n{1!s}\n'.format( base64.b64encode(hmac.new(key.encode('ascii'), data.encode('ascii'), digestmod=hashlib.sha256).digest()), base64.b64encode(data.encode('ascii')))
def fix_cxi(filename, xorpad_file): f = open(filename, "r+b") # get exheader f.seek(0x200) exheader = bytearray(f.read(0x400)) # decrypt exheader when needed if not xorpad_file is None: xorpad = bytearray(open(xorpad_file, "rb").read(0x400)) exheader = xor(exheader, xorpad) # set sd flag in exheader exh_flags = exheader[0xD] exh_flags = exh_flags | 2 exheader = exheader[:0xD] + struct.pack("B", exh_flags) + exheader[0xE:] # reset the hash f.seek(0x160) f.write(sha256(exheader)) # write back modified exheader f.seek(0x200) # return save data size to be used on make_cia save_data_size = struct.unpack("<Q", exheader[0x1C0:0x1C8])[0] / 1024 # reencrypt exheader when needed if not xorpad_file is None: exheader = xor(exheader, xorpad) f.write(exheader) return save_data_size
def search_hash(cls, needle): """ Search a hash. If len() == 8, will also search in functions hashes. Returns (samples, functions) """ results = [] needle = needle.lower() if not re.match("[0-9a-f]{5,}", needle): return [] a = Sample.query.filter_by(sha256=needle).all() b = Sample.query.filter_by(sha1=needle).all() c = Sample.query.filter_by(md5=needle).all() results = list(set(a + b + c)) function_results = None # XXX fix this # if re.match("[0-9a-f]{8}", needle): # function_results = cls.get_functions_by_machoc_hash(needle) return results, function_results
def ForgeSignature(message, pubKey): e, n = pubKey H = sha256(message) D = BytesToInteger(ASN1_GOOP + H) Dbits = (len(H) + len(ASN1_GOOP) + 1) * 8 # Strangely enough, Finney assumes N to be a power of 3, but here it's not # and it still works. N = (2 ** Dbits) - D # The -4 is to eliminate the bytes that need to be there, 00 01 at the # start of the signature, and FF 00 just before ASN1_GOOP. X = (KEY_BYTESIZE - len(H) - len(ASN1_GOOP) - 4) * 8 # We can fit anything into the X bits leftover for garbage, so we pick the # largest number we can fit. garbage = 2 ** X - 1 # In the writeup, the key bit size gets 15 bits removed; here I do the same. maxBlock = 2 ** (KEY_BITSIZE - 15) - N * (2 ** X) + garbage sigNum = cube_root(maxBlock) signature = IntegerToBytes(sigNum, KEY_BYTESIZE) return signature
def sha256(self): return hashlib.sha256(self.stream).hexdigest();
def OTX_Query_File(self, data): baseurl = "https://otx.alienvault.com:443/api/v1/indicators/file/%s/" % data headers = self._getHeaders() sections = ['general', 'analysis'] IP_ = {} try: for section in sections: queryurl = baseurl + section IP_[section] = json.loads(requests.get(queryurl, headers=headers).content) if IP_['analysis']['analysis']: # file has been analyzed before self.report({ 'pulse_count': IP_.get('general',{}).get('pulse_info',{}).get('count',"0"), 'pulses': IP_.get('general',{}).get('pulse_info',{}).get('pulses',"-"), 'malware': IP_.get('analysis',{}).get('malware',"-"), 'page_type': IP_.get('analysis',{}).get('page_type',"-"), 'sha1': IP_.get('analysis',{}).get('analysis',{}).get('info',{}).get('results',{}).get('sha1',"-"), 'sha256': IP_.get('analysis',{}).get('analysis',{}).get('info',{}).get('results',{}).get('sha256',"-"), 'md5': IP_.get('analysis',{}).get('analysis',{}).get('info',{}).get('results',{}).get('md5',"-"), 'file_class': IP_.get('analysis',{}).get('analysis',{}).get('info',{}).get('results',{}).get('file_class',"-"), 'file_type': IP_.get('analysis',{}).get('analysis',{}).get('info',{}).get('results',{}).get('file_type',"-"), 'filesize': IP_.get('analysis',{}).get('analysis',{}).get('info',{}).get('results',{}).get('filesize',"-"), 'ssdeep': IP_.get('analysis',{}).get('analysis',{}).get('info',{}).get('results',{}).get('ssdeep') }) else: # file has not been analyzed before self.report({ 'errortext': 'File has not previously been analyzed by OTX!', 'pulse_count': IP_['general']['pulse_info']['count'], 'pulses': IP_['general']['pulse_info']['pulses'] }) except: self.error('API Error! Please verify data type is correct.')
def run(self): Analyzer.run(self) if self.service == 'query': if self.data_type == 'file': hashes = self.getParam('attachment.hashes', None) if hashes is None: filepath = self.getParam('file', None, 'File is missing') hash = hashlib.sha256(open(filepath, 'r').read()).hexdigest(); else: # find SHA256 hash hash = next(h for h in hashes if len(h) == 64) self.OTX_Query_File(hash) elif self.data_type == 'url': data = self.getParam('data', None, 'Data is missing') self.OTX_Query_URL(data) elif self.data_type == 'domain': data = self.getParam('data', None, 'Data is missing') self.OTX_Query_Domain(data) elif self.data_type == 'ip': data = self.getParam('data', None, 'Data is missing') self.OTX_Query_IP(data) elif self.data_type == 'hash': data = self.getParam('data', None, 'Data is missing') self.OTX_Query_File(data) else: self.error('Invalid data type') else: self.error('Invalid service')
def run(self): try: if self.data_type == 'hash': query_url = 'scan/' query_data = self.getParam('data', None, 'Hash is missing') elif self.data_type == 'file': query_url = 'scan/' hashes = self.getParam('attachment.hashes', None) if hashes is None: filepath = self.getParam('file', None, 'File is missing') query_data = hashlib.sha256(open(filepath, 'r').read()).hexdigest() else: # find SHA256 hash query_data = next(h for h in hashes if len(h) == 64) elif self.data_type == 'filename': query_url = 'search?query=filename:' query_data = self.getParam('data', None, 'Filename is missing') else: self.notSupported() url = str(self.basic_url) + str(query_url) + str(query_data) error = True while error: r = requests.get(url, headers=self.headers, auth=HTTPBasicAuth(self.api_key, self.secret), verify=False) if "error" in r.json().get('response') == "Exceeded maximum API requests per minute(5). Please try again later.": time.sleep(60) else: error = False self.report({'results': r.json()}) except ValueError as e: self.unexpectedError(e)
def solve(x): base58char = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' for i in range(len(x)): for char in base58char: cry = x[:i]+char+x[i+1:] print cry clear = base58.b58decode(str(cry)) ori = clear[:-4] chk = clear[-4:] rechk = hashlib.sha256(hashlib.sha256(ori).digest()).digest() if chk == rechk[:4]: return cry for i in range(len(x)): for j in range(len(x)): if i == j: continue for charI in base58char: for charJ in base58char: cry = x[:i]+charI+x[i+1:] cry = cry[:j]+charJ+cry[j+1:] print cry clear = base58.b58decode(str(cry)) ori = clear[:-4] chk = clear[-4:] rechk = hashlib.sha256(hashlib.sha256(ori).digest()).digest() if chk == rechk[:4]: return cry ''' clear = base58.b58decode(str(cry)) #li = list() #for i in clear: # li.append(ord(i)) ori = clear[:-4] chk = clear[-4:] rechk = hashlib.sha256(hashlib.sha256(ori).digest()).digest() checksum = rechk[:4] final = ori+checksum return base58.b58encode(final) ''' #print solve(cry) #print solve('15GJF8Do1NDSMy4eV8H82dfFtTvKaqYyhg')
def hash_password(password, salt): return hashlib.sha256(salt.encode() + password.encode()).hexdigest() + ':' + salt # Check a hashed password
def check_hash(password, hashed_pw): hashed_pw, salt = hashed_pw.split(':') return hashed_pw == hashlib.sha256(salt.encode() + password.encode()).hexdigest() # Prompts the user for text input
def sha256(self): """ Simply return the sha256 hash value associated with the content file. If the file can't be accessed, then None is returned. """ sha256 = hashlib.sha256() if self.open(mode=NNTPFileMode.BINARY_RO): for chunk in \ iter(lambda: self.stream.read(128*sha256.block_size), b''): sha256.update(chunk) return sha256.hexdigest() return None
def write_record(self, bdist_dir, distinfo_dir): from wheel.util import urlsafe_b64encode record_path = os.path.join(distinfo_dir, 'RECORD') record_relpath = os.path.relpath(record_path, bdist_dir) def walk(): for dir, dirs, files in os.walk(bdist_dir): dirs.sort() for f in sorted(files): yield os.path.join(dir, f) def skip(path): """Wheel hashes every possible file.""" return (path == record_relpath) with open_for_csv(record_path, 'w+') as record_file: writer = csv.writer(record_file) for path in walk(): relpath = os.path.relpath(path, bdist_dir) if skip(relpath): hash = '' size = '' else: with open(path, 'rb') as f: data = f.read() digest = hashlib.sha256(data).digest() hash = 'sha256=' + native(urlsafe_b64encode(digest)) size = len(data) record_path = os.path.relpath( path, bdist_dir).replace(os.path.sep, '/') writer.writerow((record_path, hash, size))
def test_verifying_zipfile(): if not hasattr(zipfile.ZipExtFile, '_update_crc'): pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.') sio = StringIO() zf = zipfile.ZipFile(sio, 'w') zf.writestr("one", b"first file") zf.writestr("two", b"second file") zf.writestr("three", b"third file") zf.close() # In default mode, VerifyingZipFile checks the hash of any read file # mentioned with set_expected_hash(). Files not mentioned with # set_expected_hash() are not checked. vzf = wheel.install.VerifyingZipFile(sio, 'r') vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest()) vzf.set_expected_hash("three", "blurble") vzf.open("one").read() vzf.open("two").read() try: vzf.open("three").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") # In strict mode, VerifyingZipFile requires every read file to be # mentioned with set_expected_hash(). vzf.strict = True try: vzf.open("two").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") vzf.set_expected_hash("two", None) vzf.open("two").read()
def sign(wheelfile, replace=False, get_keyring=get_keyring): """Sign a wheel""" WheelKeys, keyring = get_keyring() ed25519ll = signatures.get_ed25519ll() wf = WheelFile(wheelfile, append=True) wk = WheelKeys().load() name = wf.parsed_filename.group('name') sign_with = wk.signers(name)[0] sys.stdout.write("Signing {0} with {1}\n".format(name, sign_with[1])) vk = sign_with[1] kr = keyring.get_keyring() sk = kr.get_password('wheel', vk) keypair = ed25519ll.Keypair(urlsafe_b64decode(binary(vk)), urlsafe_b64decode(binary(sk))) record_name = wf.distinfo_name + '/RECORD' sig_name = wf.distinfo_name + '/RECORD.jws' if sig_name in wf.zipfile.namelist(): raise WheelError("Wheel is already signed.") record_data = wf.zipfile.read(record_name) payload = {"hash":"sha256=" + native(urlsafe_b64encode(hashlib.sha256(record_data).digest()))} sig = signatures.sign(payload, keypair) wf.zipfile.writestr(sig_name, json.dumps(sig, sort_keys=True)) wf.zipfile.close()
def hmacsha256(data, key): return hmac.new(key, data, digestmod=hashlib.sha256).digest()
def write_record(self, bdist_dir, distinfo_dir): from .util import urlsafe_b64encode record_path = os.path.join(distinfo_dir, 'RECORD') record_relpath = os.path.relpath(record_path, bdist_dir) def walk(): for dir, dirs, files in os.walk(bdist_dir): dirs.sort() for f in sorted(files): yield os.path.join(dir, f) def skip(path): """Wheel hashes every possible file.""" return (path == record_relpath) with open_for_csv(record_path, 'w+') as record_file: writer = csv.writer(record_file) for path in walk(): relpath = os.path.relpath(path, bdist_dir) if skip(relpath): hash = '' size = '' else: with open(path, 'rb') as f: data = f.read() digest = hashlib.sha256(data).digest() hash = 'sha256=' + native(urlsafe_b64encode(digest)) size = len(data) record_path = os.path.relpath( path, bdist_dir).replace(os.path.sep, '/') writer.writerow((record_path, hash, size))
def sign(wheelfile, replace=False, get_keyring=get_keyring): """Sign a wheel""" WheelKeys, keyring = get_keyring() ed25519ll = signatures.get_ed25519ll() wf = WheelFile(wheelfile, append=True) wk = WheelKeys().load() name = wf.parsed_filename.group('name') sign_with = wk.signers(name)[0] sys.stdout.write("Signing {0} with {1}\n".format(name, sign_with[1])) vk = sign_with[1] kr = keyring.get_keyring() sk = kr.get_password('wheel', vk) keypair = ed25519ll.Keypair(urlsafe_b64decode(binary(vk)), urlsafe_b64decode(binary(sk))) record_name = wf.distinfo_name + '/RECORD' sig_name = wf.distinfo_name + '/RECORD.jws' if sig_name in wf.zipfile.namelist(): raise WheelError("Wheel is already signed.") record_data = wf.zipfile.read(record_name) payload = {"hash": "sha256=" + native(urlsafe_b64encode(hashlib.sha256(record_data).digest()))} sig = signatures.sign(payload, keypair) wf.zipfile.writestr(sig_name, json.dumps(sig, sort_keys=True)) wf.zipfile.close()
def get_rings_checksum(): """Returns sha256 checksum for rings in /etc/swift.""" sha = hashlib.sha256() for ring in SWIFT_RINGS.keys(): path = os.path.join(SWIFT_CONF_DIR, '{}.{}' .format(ring, SWIFT_RING_EXT)) if not os.path.isfile(path): continue with open(path, 'rb') as fd: sha.update(fd.read()) return sha.hexdigest()
def get_builders_checksum(): """Returns sha256 checksum for builders in /etc/swift.""" sha = hashlib.sha256() for builder in SWIFT_RINGS.values(): if not os.path.exists(builder): continue with open(builder, 'rb') as fd: sha.update(fd.read()) return sha.hexdigest()
def _generate_id(self): new_id = hashlib.sha256(self._secret + str(uuid.uuid4()) + str(time.time())) return new_id.hexdigest()
def _generate_hmac(self, session_id): return hmac.new(session_id, self._secret, hashlib.sha256).hexdigest()
def get_password_hash(pw, current_salt): """Gets the password hash for the two-step verification. current_salt should be the byte array provided by invoking GetPasswordRequest()""" # Passwords are encoded as UTF-8 # At https://github.com/DrKLO/Telegram/blob/e31388 # src/main/java/org/telegram/ui/LoginActivity.java#L2003 data = pw.encode('utf-8') pw_hash = current_salt + data + current_salt return sha256(pw_hash).digest() # endregion
def get_file_hash(file_name): """Get a sha256 hash of a file.""" with open(file_name, 'rb') as f: sha = hashlib.sha256(f.read()) return sha.hexdigest()
def change_password(login, password): salt = uuid4().hex hashed = sha256(password.encode() + salt.encode()).hexdigest() g.db = connect_db() cur = g.db.execute('UPDATE users SET password=?, salt=? WHERE login=?', (hashed, salt, login)) g.db.commit() g.db.close() ## # Change admin status # status: 1 = admin, 0 = user
def create_user(login, password, isAdmin=0): salt = uuid4().hex hashed = sha256(password.encode() + salt.encode()).hexdigest() g.db = connect_db() cur = g.db.execute('INSERT INTO users(login, password, salt, isAdmin) VALUES (?,?,?,?)', (login, hashed, salt, isAdmin)) g.db.commit() g.db.close() ## # Is the user an admin?
def l1l11(self, l11l11, l1l1l): l1ll1 = self.l1l111.encode(self.l1111(u'??????')) l1l1ll = str(l11l11) + self.l1111(u'??') + str(l1l1l) l1l1ll = l1l1ll.encode(self.l1111(u'??????')) l1lllll = l11ll1.new(l1ll1, l1l1ll, digestmod=l1ll.sha256) return l1lllll.hexdigest()