我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base58.b58encode()。
def solve(x): cry = x clear = base58.b58decode(cry) li = list() for i in clear: li.append(ord(i)) ori = clear[:-4] #chk = clear[-4:] rechk = hashlib.sha256(ori).digest() rechk = hashlib.sha256(rechk).digest() #a = list() #for i in rechk: # a.append(ord(i)) checksum = rechk[:4] final = ori+checksum return base58.b58encode(final)
def testTrustAnchorDisclosesEncryptedAttribute(addedEncryptedAttribute, symEncData, looper, userSignerA, trustAnchorSigner, trustAnchor): box = libnacl.public.Box(trustAnchorSigner.naclSigner.keyraw, userSignerA.naclSigner.verraw) data = json.dumps({SKEY: symEncData.secretKey, TXN_ID: addedEncryptedAttribute[TXN_ID]}) nonce, boxedMsg = box.encrypt(data.encode(), pack_nonce=False) op = { TARGET_NYM: userSignerA.verstr, TXN_TYPE: ATTRIB, NONCE: base58.b58encode(nonce), ENC: base58.b58encode(boxedMsg) } submitAndCheck(looper, trustAnchor, op, identifier=trustAnchorSigner.verstr)
def Export(self): """ Export this KeyPair's private key in WIF format. Returns: str: The key in wif format """ data = bytearray(38) data[0] = 0x80 data[1:33] = self.PrivateKey[0:32] data[33] = 0x01 checksum = Crypto.Default().Hash256(data[0:34]) data[34:38] = checksum[0:4] b58 = base58.b58encode(bytes(data)) return b58
def test_should_throw_error_on_invalid_checksum(self): # build fake wif fakewif = bytearray(34 * 'A', 'utf8') fakewif[0] = 0x80 fakewif[33] = 0x01 # fake checksum fakewif.append(0xDE) fakewif.append(0xAD) fakewif.append(0xBE) fakewif.append(0xEF) encodedFakeWIF = base58.b58encode(bytes(fakewif)) with self.assertRaises(ValueError) as context: KeyPair.PrivateKeyFromWIF(encodedFakeWIF) self.assertEqual('Invalid WIF Checksum!', str(context.exception))
def testTrustAnchorDisclosesEncryptedAttribute( addedEncryptedAttribute, symEncData, looper, userSignerA, trustAnchorSigner, trustAnchor): box = libnacl.public.Box(trustAnchorSigner.naclSigner.keyraw, userSignerA.naclSigner.verraw) data = json.dumps({SKEY: symEncData.secretKey, TXN_ID: addedEncryptedAttribute[TXN_ID]}) nonce, boxedMsg = box.encrypt(data.encode(), pack_nonce=False) op = { TARGET_NYM: userSignerA.verstr, TXN_TYPE: ATTRIB, NONCE: base58.b58encode(nonce), ENC: base58.b58encode(boxedMsg) } submitAndCheck(looper, trustAnchor, op, identifier=trustAnchorSigner.verstr)
def _fulfillment_to_details(fulfillment): """Encode a fulfillment as a details dictionary Args: fulfillment: Crypto-conditions Fulfillment object """ if fulfillment.type_name == 'ed25519-sha-256': return { 'type': 'ed25519-sha-256', 'public_key': base58.b58encode(fulfillment.public_key), } if fulfillment.type_name == 'threshold-sha-256': subconditions = [ _fulfillment_to_details(cond['body']) for cond in fulfillment.subconditions ] return { 'type': 'threshold-sha-256', 'threshold': fulfillment.threshold, 'subconditions': subconditions, } raise UnsupportedTypeError(fulfillment.type_name)
def address_bytes_to_string(proto, buf): from .util import decode_big_endian_16 if proto.code == P_IP4: return str(IPAddress(int(buf, 16), 4).ipv4()) elif proto.code == P_IP6: return str(IPAddress(int(buf, 16), 6).ipv6()) elif proto.code in [P_TCP, P_UDP, P_DCCP, P_SCTP]: return str(decode_big_endian_16(binascii.unhexlify(buf))) elif proto.code == P_ONION: buf = binascii.unhexlify(buf) addr_bytes, port_bytes = (buf[:-2], buf[-2:]) addr = base64.b32encode(addr_bytes).decode('ascii').lower() port = str(decode_big_endian_16(port_bytes)) return ':'.join([addr, port]) elif proto.code == P_IPFS: buf = binascii.unhexlify(buf) size, num_bytes_read = read_varint_code(buf) buf = buf[num_bytes_read:] if len(buf) != size: raise ValueError("inconsistent lengths") return base58.b58encode(buf) raise ValueError("unknown protocol")
def hex_private_key_to_WIF_private_key(hex_key): """ Converts a raw 256-bit hex private key to WIF format returns => <string> in hex format """ hex_key_with_prefix = "80" + hex_key h1 = hash_sha256(hex_key_with_prefix.decode("hex")) h2 = hash_sha256(h1.decode("hex")) checksum = h2[0:8] wif_key_before_base58Check = hex_key_with_prefix + checksum wif_key = b58encode(wif_key_before_base58Check.decode("hex")) return wif_key ################################################################################################ # # Bitcoin helper functions # ################################################################################################
def keypair(seed=None): if not seed: seed = nacl.utils.random(32) signing_key = nacl.signing.SigningKey(seed=seed) private_key = signing_key.to_curve25519_private_key() return {'sign': signing_key, 'sign_b58': base58.b58encode(signing_key.encode()), 'verify': signing_key.verify_key, 'verify_b58': base58.b58encode(signing_key.verify_key.encode()), 'private': private_key, 'private_b58': base58.b58encode(private_key.encode()), 'public': private_key.public_key, 'public_b58': base58.b58encode(private_key.public_key.encode()), 'seed': seed}
def testSponsorDisclosesEncryptedAttribute(addedEncryptedAttribute, symEncData, looper, userSignerA, sponsorSigner, sponsor): box = libnacl.public.Box(sponsorSigner.naclSigner.keyraw, userSignerA.naclSigner.verraw) data = json.dumps({SKEY: symEncData.secretKey, TXN_ID: addedEncryptedAttribute[TXN_ID]}) nonce, boxedMsg = box.encrypt(data.encode(), pack_nonce=False) op = { TARGET_NYM: userSignerA.verstr, TXN_TYPE: ATTRIB, NONCE: base58.b58encode(nonce), ENC: base58.b58encode(boxedMsg) } submitAndCheck(looper, sponsor, op, identifier=sponsorSigner.verstr)
def __init__(self, private_key=0): if private_key == 0: self.private_key = os.urandom(32) self.printable_pk = str(binascii.hexlify(self.private_key), "ascii") else: self.printable_pk = private_key self.private_key = binascii.unhexlify(private_key.encode('ascii')) self.sk = ecdsa.SigningKey.from_string(self.private_key, curve = ecdsa.SECP256k1) self.vk = self.sk.verifying_key self.public_key = b"04" + binascii.hexlify(self.vk.to_string()) ripemd160 = hashlib.new('ripemd160') ripemd160.update(hashlib.sha256(binascii.unhexlify(self.public_key)).digest()) self.hashed_public_key = b"00" + binascii.hexlify(ripemd160.digest()) self.checksum = binascii.hexlify(hashlib.sha256(hashlib.sha256(binascii.unhexlify(self.hashed_public_key)).digest()).digest()[:4]) self.binary_addr = binascii.unhexlify(self.hashed_public_key + self.checksum) self.addr = base58.b58encode(self.binary_addr)
def sendAsset(pubKey, privKey, recipient, assetId, amount, txfee): timestamp = int(time.time() * 1000) sData = '\4' + base58.b58decode(pubKey) + '\1' + base58.b58decode(assetId) + '\0' + struct.pack(">Q", timestamp) + struct.pack(">Q", amount) + struct.pack(">Q", txfee) + base58.b58decode(recipient) + '\0\0' random64 = os.urandom(64) signature = base58.b58encode(curve.calculateSignature(random64, base58.b58decode(privKey), sData)) data = json.dumps({ "assetId": assetId, "senderPublicKey": pubKey, "recipient": recipient, "amount": amount, "fee": txfee, "timestamp": timestamp, "attachment": "", "signature": signature }) c = pycurl.Curl() c.setopt(pycurl.URL, "http://%s:%s/assets/broadcast/transfer" % (NODE_IP, NODE_PORT)) c.setopt(pycurl.HTTPHEADER, ['Content-Type: application/json', 'Accept: application/json']) c.setopt(pycurl.POST, 1) c.setopt(pycurl.POSTFIELDS, data) c.perform() c.close()
def invalid_verkey_tdir(tdir_for_func): ledger = Ledger(CompactMerkleTree(), dataDir=tdir_for_func) for d in range(3): txn = {TXN_TYPE: '0', TARGET_NYM: base58.b58encode(b'whatever'), IDENTIFIER: "Th7MpTaRZVRYnPiabds81Y", DATA: { NAME: str(d), ALIAS: 'test' + str(d), SERVICES: [VALIDATOR], } } if d == 1: txn[TARGET_NYM] = "invalid====" ledger.add(txn) ledger.stop()
def __init__(self, verkey, identifier=None): _verkey = verkey self._verkey = None self._vr = None if identifier: rawIdr = b58decode(identifier) if len(rawIdr) == 32 and not verkey: # assume cryptonym verkey = identifier assert verkey, 'verkey must be provided' if verkey[0] == '~': # abbreviated verkey = b58encode(b58decode(identifier) + b58decode(verkey[1:])) try: self.verkey = verkey except Exception as ex: raise InvalidKey("verkey {}".format(_verkey)) from ex
def solve(x): base58char = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' for i in range(len(x)): for char in base58char: cry = x[:i]+char+x[i+1:] print cry clear = base58.b58decode(str(cry)) ori = clear[:-4] chk = clear[-4:] rechk = hashlib.sha256(hashlib.sha256(ori).digest()).digest() if chk == rechk[:4]: return cry for i in range(len(x)): for j in range(len(x)): if i == j: continue for charI in base58char: for charJ in base58char: cry = x[:i]+charI+x[i+1:] cry = cry[:j]+charJ+cry[j+1:] print cry clear = base58.b58decode(str(cry)) ori = clear[:-4] chk = clear[-4:] rechk = hashlib.sha256(hashlib.sha256(ori).digest()).digest() if chk == rechk[:4]: return cry ''' clear = base58.b58decode(str(cry)) #li = list() #for i in clear: # li.append(ord(i)) ori = clear[:-4] chk = clear[-4:] rechk = hashlib.sha256(hashlib.sha256(ori).digest()).digest() checksum = rechk[:4] final = ori+checksum return base58.b58encode(final) ''' #print solve(cry) #print solve('15GJF8Do1NDSMy4eV8H82dfFtTvKaqYyhg')
def hexb58(hex): return base58.b58encode(hex.decode('hex_codec'))
def hash_to_address(version, hash): vh = version + hash return base58.b58encode(vh + double_sha256(vh)[:4])
def _generate(self, privateKey='', seed=''): self.seed = seed if not privateKey and not seed: wordCount = 2048 words = [] for i in range(5): r = crypto.bytes2str(os.urandom(4)) x = (ord(r[3])) + (ord(r[2]) << 8) + (ord(r[1]) << 16) + (ord(r[0]) << 24) w1 = x % wordCount w2 = ((int(x / wordCount) >> 0) + w1) % wordCount w3 = ((int((int(x / wordCount) >> 0) / wordCount) >> 0) + w2) % wordCount words.append(wordList[w1]) words.append(wordList[w2]) words.append(wordList[w3]) self.seed = ' '.join(words) seedHash = crypto.hashChain(('\0\0\0\0' + self.seed).encode('utf-8')) accountSeedHash = crypto.sha256(seedHash) if not privateKey: privKey = curve.generatePrivateKey(accountSeedHash) else: privKey = base58.b58decode(privateKey) pubKey = curve.generatePublicKey(privKey) unhashedAddress = chr(1) + str(pywaves.CHAIN_ID) + crypto.hashChain(pubKey)[0:20] addressHash = crypto.hashChain(crypto.str2bytes(unhashedAddress))[0:4] self.address = base58.b58encode(crypto.str2bytes(unhashedAddress + addressHash)) self.publicKey = base58.b58encode(pubKey) self.privateKey = base58.b58encode(privKey)
def sendWaves(self, recipient, amount, attachment='', txFee=pywaves.DEFAULT_TX_FEE, timestamp=0): if not self.privateKey: logging.error('Private key required') elif amount <= 0: logging.error('Amount must be > 0') elif not pywaves.OFFLINE and self.balance() < amount + txFee: logging.error('Insufficient Waves balance') else: if timestamp == 0: timestamp = int(time.time() * 1000) sData = b'\4' + \ base58.b58decode(self.publicKey) + \ b'\0\0' + \ struct.pack(">Q", timestamp) + \ struct.pack(">Q", amount) + \ struct.pack(">Q", txFee) + \ base58.b58decode(recipient.address) + \ struct.pack(">H", len(attachment)) + \ crypto.str2bytes(attachment) signature = crypto.sign(self.privateKey, sData) data = json.dumps({ "senderPublicKey": self.publicKey, "recipient": recipient.address, "amount": amount, "fee": txFee, "timestamp": timestamp, "attachment": base58.b58encode(crypto.str2bytes(attachment)), "signature": signature }) return pywaves.wrapper('/assets/broadcast/transfer', data)
def sendAsset(self, recipient, asset, amount, attachment='', txFee=pywaves.DEFAULT_TX_FEE, timestamp=0): if not self.privateKey: logging.error('Private key required') elif not pywaves.OFFLINE and not asset.status(): logging.error('Asset not issued') elif amount <= 0: logging.error('Amount must be > 0') elif not pywaves.OFFLINE and self.balance(asset.assetId) < amount: logging.error('Insufficient asset balance') elif not pywaves.OFFLINE and self.balance() < txFee: logging.error('Insufficient Waves balance') else: if timestamp == 0: timestamp = int(time.time() * 1000) sData = b'\4' + \ base58.b58decode(self.publicKey) + \ b'\1' + \ base58.b58decode(asset.assetId) + \ b'\0' + \ struct.pack(">Q", timestamp) + \ struct.pack(">Q", amount) + \ struct.pack(">Q", txFee) + \ base58.b58decode(recipient.address) + \ struct.pack(">H", len(attachment)) + \ crypto.str2bytes(attachment) signature = crypto.sign(self.privateKey, sData) data = json.dumps({ "assetId": asset.assetId, "senderPublicKey": self.publicKey, "recipient": recipient.address, "amount": amount, "fee": txFee, "timestamp": timestamp, "attachment": base58.b58encode(crypto.str2bytes(attachment)), "signature": signature }) return pywaves.wrapper('/assets/broadcast/transfer', data)
def sign(privateKey, message): random64 = os.urandom(64) return base58.b58encode(curve.calculateSignature(random64, base58.b58decode(privateKey), message))
def id(message): return base58.b58encode(hashlib.sha256(message).digest())
def simpleEncrypt(self, passphrase, data): """Encrypt data. Args: passphrase (str): passphrase to use for encryption. data (str/bytes): original data. Returns: (str): base58-encoded encrypted data. """ key, iv = self.EVP_BytesToKey(passphrase, 32, 16) return base58.b58encode(self.encrypt(data, key, iv))
def bytes2ascii(s): """ >>> bytes2ascii(b"test") '3yZe7d' """ return b58encode(s)
def pet2ascii(p): """ >>> from petlib.ec import EcGroup, EcPt >>> G = EcGroup() >>> pt = EcPt(G) >>> pet2ascii(pt) '3Xw3vNAdCmDLs' """ return b58encode(encode(p))
def apply_creation_cell(acc, update): result = copy.deepcopy(acc) try: result['entity'] = base58.b58encode(update['entity']['@link']) except KeyError as e: pass return result
def get_object_chain(reference, acc): if reference is None: return acc db = get_db() obj = db.get(reference, retry_if_missing=True) try: next_ref = obj['chain']['@link'] next_ref = base58.b58encode(next_ref) except KeyError as e: next_ref = None return get_object_chain(next_ref, [obj] + acc)
def stringify_refs(obj): res = {} for k, v in obj.iteritems(): if isinstance(v, dict): v = stringify_refs(v) if k == u'@link' and isinstance(v, basestring): try: v = base58.b58encode(v) except (ValueError, AttributeError, TypeError): pass res[k] = v return res
def multihash_base58(self): return b58encode(self.multihash)
def ref_base58(ref): ref_str = ref try: ref_str = ref.multihash_base58() except AttributeError: pass try: ref_str = ref.reference except AttributeError: pass try: ref_str = base58.b58encode(ref['@link']) except (KeyError, ValueError, TypeError): pass return ref_str
def doAttrDisclose(self, origin, target, txnId, key): box = libnacl.public.Box(b58decode(origin), b58decode(target)) data = json.dumps({TXN_ID: txnId, SKEY: key}) nonce, boxedMsg = box.encrypt(data.encode(), pack_nonce=False) op = { TARGET_NYM: target, TXN_TYPE: DISCLO, NONCE: b58encode(nonce), DATA: b58encode(boxedMsg) } self.submit(op, identifier=origin)
def client1Signer(): seed = b'client1Signer secret key........' signer = DidSigner(seed=seed) testable_verkey = friendlyToRaw(signer.identifier) testable_verkey += friendlyToRaw(signer.verkey[1:]) testable_verkey = base58.b58encode(testable_verkey) assert testable_verkey == '6JvpZp2haQgisbXEXE9NE6n3Tuv77MZb5HdF9jS5qY8m' return signer
def hash_to_wallet_address(ba, address_version=23): sb = bytearray([23]) + ba c256 = bin_dbl_sha256(sb)[0:4] outb = sb + bytearray(c256) return base58.b58encode(bytes(outb))
def noKeyIdr(wallet): idr = base58.b58encode(b'1' * 16) return wallet.addIdentifier(identifier=idr)[0]
def fullKeyIdr(wallet): idr = base58.b58encode(b'2' * 16) return wallet.addIdentifier(identifier=idr)[0]
def create_code(self): if self.code: raise FieldError hash_input = ( "fim_rudi" + self.participant_1_email + str(time.clock()) + str(self.event) + str(random.random())) digest = sha256(hash_input).digest() # if code already exists create another new_code = base58.b58encode(digest) if Team.objects.filter(code=new_code).exists(): return self.create_code() self.code = new_code
def hash_160_to_btc_address(h160, v): """ Calculates the Bitcoin address of a given RIPEMD-160 hash from an elliptic curve public key. :param h160: RIPEMD-160 hash. :type h160: bytes :param v: version (prefix) used to calculate the Bitcoin address. Possible values: - 0 for main network (PUBKEY_HASH). - 111 For testnet (TESTNET_PUBKEY_HASH). :type v: int :return: The corresponding Bitcoin address. :rtype: hex str """ # If h160 is passed as hex str, the value is converted into bytes. if match('^[0-9a-fA-F]*$', h160): h160 = unhexlify(h160) # Add the network version leading the previously calculated RIPEMD-160 hash. vh160 = chr(v) + h160 # Double sha256. h = sha256(sha256(vh160).digest()).digest() # Add the two first bytes of the result as a checksum tailing the RIPEMD-160 hash. addr = vh160 + h[0:4] # Obtain the Bitcoin address by Base58 encoding the result addr = b58encode(addr) return addr
def hexPrivateKey_to_WIFkey(hpk): bpk = hexPrivateKey_to_binPrivateKey(hpk) if len(bpk) != 32: print 'Error' return None wif = '\x80' + bpk + '\x01' wif = wif + hashlib.sha256(hashlib.sha256(wif).digest()).digest()[0:4] wif = b58encode(wif) return wif
def generate_coin_id(self): coin_id = str(random.getrandbits(settings.coin_id_size)) coin_id = base58.b58encode(coin_id.encode()) return coin_id
def noKeyIdr(wallet): idr = base58.b58encode(b'1'*16) return wallet.addIdentifier(identifier=idr)[0]
def fullKeyIdr(wallet): idr = base58.b58encode(b'2'*16) return wallet.addIdentifier(identifier=idr)[0]
def to_dict(self): """ Generate a dict of the fulfillment Returns: dict: representing the fulfillment """ return { 'type': Ed25519Sha256.TYPE_NAME, 'public_key': base58.b58encode(self.public_key), 'signature': base58.b58encode(self.signature) if self.signature else None } # TODO Adapt according to outcomes of # https://github.com/rfcs/crypto-conditions/issues/16
def to_dict(self): """Generate a dict of the condition Returns: dict: representing the condition """ return { 'type_id': self.type_id, 'hash': base58.b58encode(self.hash), 'cost': self.cost, 'subtypes': self.subtypes, }
def encode(data): return base58.b58encode(data).encode()
def base58encode(i): return base58.b58encode(str(i).encode())