我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用Crypto.Util.number.bytes_to_long()。
def gen_key(): while True: p = getPrime(k/2) if gcd(e, p-1) == 1: break q_t = getPrime(k/2) n_t = p * q_t t = get_bit(n_t, k/16, 1) y = get_bit(n_t, 5*k/8, 0) p4 = get_bit(p, 5*k/16, 1) u = pi_b(p4, 1) n = bytes_to_long(long_to_bytes(t) + long_to_bytes(u) + long_to_bytes(y)) q = n / p if q % 2 == 0: q += 1 while True: if isPrime(q) and gcd(e, q-1) == 1: break m = getPrime(k/16) + 1 q ^= m return (p, q, e)
def setUp(self): global RSA, Random, bytes_to_long from Crypto.PublicKey import RSA from Crypto import Random from Crypto.Util.number import bytes_to_long, inverse self.n = bytes_to_long(a2b_hex(self.modulus)) self.p = bytes_to_long(a2b_hex(self.prime_factor)) # Compute q, d, and u from n, e, and p self.q = divmod(self.n, self.p)[0] self.d = inverse(self.e, (self.p-1)*(self.q-1)) self.u = inverse(self.p, self.q) # u = e**-1 (mod q) self.rsa = RSA
def decode(self, derEle, noLeftOvers=0): """Decode a complete INTEGER DER element, and re-initializes this object with it. @param derEle A complete INTEGER DER element. It must start with a DER INTEGER tag. @param noLeftOvers Indicate whether it is acceptable to complete the parsing of the DER element and find that not all bytes in derEle have been used. @return Index of the first unused byte in the given DER element. Raises a ValueError exception if the DER element is not a valid non-negative INTEGER. Raises an IndexError exception if the DER element is too short. """ tlvLength = DerObject.decode(self, derEle, noLeftOvers) if self.typeTag!=self.typeTags['INTEGER']: raise ValueError ("Not a DER INTEGER.") if bord(self.payload[0])>127: raise ValueError ("Negative INTEGER.") self.value = bytes_to_long(self.payload) return tlvLength
def generateQ(randfunc): S=randfunc(20) hash1=SHA.new(S).digest() hash2=SHA.new(long_to_bytes(bytes_to_long(S)+1)).digest() q = bignum(0) for i in range(0,20): c=bord(hash1[i])^bord(hash2[i]) if i==0: c=c | 128 if i==19: c= c | 1 q=q*256+c while (not isPrime(q)): q=q+2 if pow(2,159L) < q < pow(2,160L): return S, q raise RuntimeError('Bad q value generated')
def _decrypt_stealth_auth(content, authentication_cookie): from Crypto.Cipher import AES from Crypto.Util import Counter from Crypto.Util.number import bytes_to_long # byte 1 = authentication type, 2-17 = input vector, 18 on = encrypted content iv, encrypted = content[1:17], content[17:] counter = Counter.new(128, initial_value = bytes_to_long(iv)) cipher = AES.new(authentication_cookie, AES.MODE_CTR, counter = counter) return cipher.decrypt(encrypted)
def generateQ(randfunc): S=randfunc(20) hash1=SHA.new(S).digest() hash2=SHA.new(long_to_bytes(bytes_to_long(S)+1)).digest() q = bignum(0) for i in range(0,20): c=bord(hash1[i])^bord(hash2[i]) if i==0: c=c | 128 if i==19: c= c | 1 q=q*256+c while (not isPrime(q)): q=q+2 if pow(2,159) < q < pow(2,160): return S, q raise RuntimeError('Bad q value generated')
def test_rsa_broadcast_icectf(self): '''Special case where a standard broadcast attack will not work because plaintext is bigger than any of the provided modulus. ''' '''Every individual RSA encryption loses some information, but when enough pubkeys and ciphertexts are gathered, the plaintext can be "magically" recovered. ''' '''http://blog.atx.name/icectf/#Agents''' exponent = 3 ns = [] for _ in range(200): _, (n, e) = keygen_rsa(1024, exponent) ns.append(n) # making sure msg is bigger than the biggest modulus msg = 'the secret msg is ' while not bytes_to_long(msg) > max(ns): msg += random_chars(1) msg += random_chars(1000) pairs = [] for n in ns: ct = encrypt_rsa((n, exponent), bytes_to_long(msg)) pairs.append((n, ct)) rec = rsa_broadcast_attack(pairs) self.assertTrue(rec == msg)
def pi_b(x, m): ''' m: 1: encrypt 0: decrypt ''' enc = DES.new(key) if m: method = enc.encrypt else: method = enc.decrypt s = long_to_bytes(x) sp = [s[a:a+8] for a in xrange(0, len(s), 8)] r = "" for a in sp: r += method(a) return bytes_to_long(r)
def change_key(self, master_key): if (len(master_key)*8 not in (128, 192, 256)): raise InvalidInputException('Error: Master key must be \ 128, 192 or 256 bit') self.__master_key = master_key self.__aes_ecb = AES.new(self.__master_key, AES.MODE_ECB) self.__auth_key = bytes_to_long(self.__aes_ecb.encrypt(b'\x00' * 16)) # precompute the table for multiplication in finite field table = [] # for 8-bit for i in range(16): row = [] for j in range(256): row.append(gf_2_128_mul(self.__auth_key, j << (8 * i))) table.append(tuple(row)) self.__pre_table = tuple(table) self.prev_init_value = None # reset
def __ghash(self, aad, txt): len_aad = len(aad) len_txt = len(txt) # padding if 0 == len_aad % 16: data = aad else: data = aad + b'\x00' * (16 - len_aad % 16) if 0 == len_txt % 16: data += txt else: data += txt + b'\x00' * (16 - len_txt % 16) tag = 0 assert len(data) % 16 == 0 for i in range(len(data) // 16): tag ^= bytes_to_long(data[i * 16: (i + 1) * 16]) tag = self.__times_auth_key(tag) # print 'X\t', hex(tag) tag ^= ((8 * len_aad) << 64) | (8 * len_txt) tag = self.__times_auth_key(tag) return tag
def getrandbits(self, k): """Return a python long integer with k random bits.""" if self._randfunc is None: self._randfunc = Random.new().read mask = (1L << k) - 1 return mask & bytes_to_long(self._randfunc(ceil_div(k, 8)))
def _exercise_primitive(self, rsaObj): # Since we're using a randomly-generated key, we can't check the test # vector, but we can make sure encryption and decryption are inverse # operations. ciphertext = a2b_hex(self.ciphertext) # Test decryption plaintext = rsaObj.decrypt((ciphertext,)) # Test encryption (2 arguments) (new_ciphertext2,) = rsaObj.encrypt(plaintext, b("")) self.assertEqual(b2a_hex(ciphertext), b2a_hex(new_ciphertext2)) # Test blinded decryption blinding_factor = Random.new().read(len(ciphertext)-1) blinded_ctext = rsaObj.blind(ciphertext, blinding_factor) blinded_ptext = rsaObj.decrypt((blinded_ctext,)) unblinded_plaintext = rsaObj.unblind(blinded_ptext, blinding_factor) self.assertEqual(b2a_hex(plaintext), b2a_hex(unblinded_plaintext)) # Test signing (2 arguments) signature2 = rsaObj.sign(ciphertext, b("")) self.assertEqual((bytes_to_long(plaintext),), signature2) # Test verification self.assertEqual(1, rsaObj.verify(ciphertext, (bytes_to_long(plaintext),)))
def _exercise_public_primitive(self, rsaObj): plaintext = a2b_hex(self.plaintext) # Test encryption (2 arguments) (new_ciphertext2,) = rsaObj.encrypt(plaintext, b("")) # Exercise verification rsaObj.verify(new_ciphertext2, (bytes_to_long(plaintext),))
def _check_verification(self, rsaObj): signature = bytes_to_long(a2b_hex(self.plaintext)) message = a2b_hex(self.ciphertext) # Test verification t = (signature,) # rsaObj.verify expects a tuple self.assertEqual(1, rsaObj.verify(message, t)) # Test verification with overlong tuple (this is a # backward-compatibility hack to support some harmless misuse of the # API) t2 = (signature, '') self.assertEqual(1, rsaObj.verify(message, t2)) # extra garbage at end of tuple
def setUp(self): global DSA, Random, bytes_to_long, size from Crypto.PublicKey import DSA from Crypto import Random from Crypto.Util.number import bytes_to_long, inverse, size self.dsa = DSA
def test_construct_4tuple(self): """DSA (default implementation) constructed key (4-tuple)""" (y, g, p, q) = [bytes_to_long(a2b_hex(param)) for param in (self.y, self.g, self.p, self.q)] dsaObj = self.dsa.construct((y, g, p, q)) self._test_verification(dsaObj)
def test_construct_5tuple(self): """DSA (default implementation) constructed key (5-tuple)""" (y, g, p, q, x) = [bytes_to_long(a2b_hex(param)) for param in (self.y, self.g, self.p, self.q, self.x)] dsaObj = self.dsa.construct((y, g, p, q, x)) self._test_signing(dsaObj) self._test_verification(dsaObj)
def _test_signing(self, dsaObj): k = a2b_hex(self.k) m_hash = a2b_hex(self.m_hash) r = bytes_to_long(a2b_hex(self.r)) s = bytes_to_long(a2b_hex(self.s)) (r_out, s_out) = dsaObj.sign(m_hash, k) self.assertEqual((r, s), (r_out, s_out))
def _test_verification(self, dsaObj): m_hash = a2b_hex(self.m_hash) r = bytes_to_long(a2b_hex(self.r)) s = bytes_to_long(a2b_hex(self.s)) self.assertEqual(1, dsaObj.verify(m_hash, (r, s))) self.assertEqual(0, dsaObj.verify(m_hash + b("\0"), (r, s)))
def pkcs_os2ip(x): """ Accepts a byte string as input parameter and return the associated long value: Input : x octet string to be converted Output: x corresponding nonnegative integer Reverse function is pkcs_i2osp() """ return number.bytes_to_long(x) # IP2OS function defined in RFC 3447 for octet string to integer conversion
def diffie_hell_man(sock, bits=2048): # using RFC 3526 MOPD group 14 (2048 bits) p = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF; g = 2 a = bytes_to_long(os.urandom(32)) # a 256bit number, sufficiently large xA = pow(g, a, p) sock.send(long_to_bytes(xA)) b = bytes_to_long(sock.recv(256)) s = pow(b, a, p) return SHA256.new(long_to_bytes(s)).digest()
def test_negotiate(self, group=14): server = socket.socket() server.bind(('',0)) server.listen(1) port = server.getsockname()[1] pid = os.fork() # child process - aka, the server if pid == 0: sock, _ = server.accept() server.close() # parent - aka, the client else: sock = socket.socket() sock.connect(('', port)) server.close() alice = pyDHE.new(group) local_key = alice.negotiate(sock) #sock.close() if pid == 0: sock.send(long_to_bytes(local_key)) sock.close() else: os.wait() remote_key = bytes_to_long(sock.recv(1024)) sock.close() self.assertEqual(local_key, remote_key, "keys do not match")
def negotiate(self, sock): sock.send(long_to_bytes(self.getPublicKey())) B = sock.recv(1024) # 8192 bits return self.update(bytes_to_long(B))