我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用base58.b58decode_check()。
def encode_fallback(fallback, currency): """ Encode all supported fallback addresses. """ if currency == 'bc' or currency == 'tb': fbhrp, witness = bech32_decode(fallback) if fbhrp: if fbhrp != currency: raise ValueError("Not a bech32 address for this currency") wver = witness[0] if wver > 16: raise ValueError("Invalid witness version {}".format(witness[0])) wprog = u5_to_bitarray(witness[1:]) else: addr = base58.b58decode_check(fallback) if is_p2pkh(currency, addr[0]): wver = 17 elif is_p2sh(currency, addr[0]): wver = 18 else: raise ValueError("Unknown address type for {}".format(currency)) wprog = addr[1:] return tagged('f', bitstring.pack("uint:5", wver) + wprog) else: raise NotImplementedError("Support for currency {} not implemented".format(currency))
def validate_sig(body, sig_str, pkh): """ Validate the signature on the body of the request - throws exception if not valid. """ # Check permission to update if (pkh is None): raise PermissionError("pkh not found.") # Validate the pkh format base58.b58decode_check(pkh) if (len(pkh) < 20) or (len(pkh) > 40): raise PermissionError("Invalid pkh") try: if not sig_str: raise PermissionError("X-Bitcoin-Sig header not found.") if not wallet.verify_bitcoin_message(body, sig_str, pkh): raise PermissionError("X-Bitcoin-Sig header not valid.") except Exception as err: logger.error("Failure: {0}".format(err)) raise PermissionError("X-Bitcoin-Sig header validation failed.") return True
def from_b58check(private_key): """ Decodes a Base58Check encoded private-key. Args: private_key (str): A Base58Check encoded private key. Returns: PrivateKey: A PrivateKey object """ b58dec = base58.b58decode_check(private_key) version = b58dec[0] assert version in [PrivateKey.TESTNET_VERSION, PrivateKey.MAINNET_VERSION] return PrivateKey(int.from_bytes(b58dec[1:], 'big'))
def from_b58check(key): """ Decodes a Base58Check encoded key. The encoding must conform to the description in: https://github.com/bitcoin/bips/blob/master/bip-0032.mediawiki#serialization-format Args: key (str): A Base58Check encoded key. Returns: HDPrivateKey or HDPublicKey: Either an HD private or public key object, depending on what was serialized. """ return HDKey.from_bytes(base58.b58decode_check(key))
def address_to_key_hash(s): """ Given a Bitcoin address decodes the version and RIPEMD-160 hash of the public key. Args: s (bytes): The Bitcoin address to decode Returns: (version, h160) (tuple): A tuple containing the version and RIPEMD-160 hash of the public key. """ n = base58.b58decode_check(s) version = n[0] h160 = n[1:] return version, h160
def coinbaseMessage(previous_output, receiver_address, my_address, private_key): receiver_hashed_pubkey= base58.b58decode_check(receiver_address)[1:].encode("hex") my_hashed_pubkey = base58.b58decode_check(my_address)[1:].encode("hex") # Transaction stuff version = struct.pack("<L", 1) lock_time = struct.pack("<L", 0) hash_code = struct.pack("<L", 1) # Transactions input tx_in_count = struct.pack("<B", 1) tx_in = {} tx_in["outpoint_hash"] = previous_output.decode('hex')[::-1] tx_in["outpoint_index"] = struct.pack("<L", 4294967295) tx_in["script"] = ("76a914%s88ac" % my_hashed_pubkey).decode("hex") tx_in["script_bytes"] = struct.pack("<B", (len(tx_in["script"]))) tx_in["sequence"] = "ffffffff".decode("hex") # Transaction output tx_out_count = struct.pack("<B", 1) tx_out = {} tx_out["value"]= struct.pack("<Q", 100000000 * 12.50033629) tx_out["pk_script"]= ("76a914%s88ac" % receiver_hashed_pubkey).decode("hex") tx_out["pk_script_bytes"]= struct.pack("<B", (len(tx_out["pk_script"]))) tx_to_sign = (version + tx_in_count + tx_in["outpoint_hash"] + tx_in["outpoint_index"] + tx_in["script_bytes"] + tx_in["script"] + tx_in["sequence"] + tx_out_count + tx_out["value"] + tx_out["pk_script_bytes"] + tx_out["pk_script"] + lock_time + hash_code) # Signing txn hashed_raw_tx = hashlib.sha256(hashlib.sha256(tx_to_sign).digest()).digest() sk = ecdsa.SigningKey.from_string(private_key.decode("hex"), curve = ecdsa.SECP256k1) vk = sk.verifying_key public_key = ('\04' + vk.to_string()).encode("hex") sign = sk.sign_digest(hashed_raw_tx, sigencode=ecdsa.util.sigencode_der) # Complete txn sigscript = sign + "\01" + struct.pack("<B", len(public_key.decode("hex"))) + public_key.decode("hex") real_tx = (version + tx_in_count + tx_in["outpoint_hash"] + tx_in["outpoint_index"] + struct.pack("<B", (len(sigscript) + 1)) + struct.pack("<B", len(sign) + 1) + sigscript + tx_in["sequence"] + tx_out_count + tx_out["value"] + tx_out["pk_script_bytes"] + tx_out["pk_script"] + lock_time) return real_tx
def _flush(client, wallet, machine_auth, amount=None, payout_address=None, silent=False, to_primary=False): """ Flushes current off-chain buffer to the blockchain. Args: client (two1.server.rest_client.TwentyOneRestClient) an object for sending authenticated requests to the TwentyOne backend. wallet (two1.wallet.Wallet): a user's wallet instance. amount (int): The amount to be flushed. Should be more than 10k. payout_address (string): The address to flush the Bitcoins to. silent (boolean): If True, disables the confirmation prompt. to_primary (boolean): If True, flushes to the primary wallet. Raises: ServerRequestError: if server returns an error code other than 401. """ # check the payout address if payout_address: try: base58.b58decode_check(payout_address) except ValueError: logger.error(uxstring.UxString.flush_invalid_address) return # select the wallet and the associated payout address for the flush all_wallets = client.list_wallets() wallet_payout_address, wallet_name = _select_flush_wallet(client, machine_auth, payout_address, all_wallets, to_primary) # ask user for confirmation if not silent: # if the user has not specified a payout_address then the buffer will be flushed to the # primary wallet. is_local = payout_address is None should_continue = _show_confirmation(machine_auth, amount, all_wallets, wallet_payout_address, wallet_name, to_primary, is_local) if not should_continue: return # perform flush try: response = client.flush_earnings(amount=amount, payout_address=wallet_payout_address) if response.ok: success_msg = uxstring.UxString.flush_success.format(wallet_payout_address) logger.info(success_msg) except exceptions.ServerRequestError as ex: if ex.status_code == 401: logger.info(ex.message) elif ex.status_code == 400 and ex.data.get("error") == "TO500": logger.info(uxstring.UxString.flush_not_enough_earnings.format(amount), fg="red") elif ex.status_code == 403 and ex.data.get("error") == "social_account_required_to_flush": logger.info("You must connect a social account with your 21.co account before you can flush.", fg="red") else: raise ex
def PrivateKeyFromNEP2(nep2_key, passphrase): """ Gets the private key from a NEP-2 encrypted private key Args: nep2_key (str): The nep-2 encrypted private key passphrase (str): The password to encrypt the private key with, as unicode string Returns: bytes: The private key """ if not nep2_key or len(nep2_key) != 58: raise ValueError('Please provide a nep2_key with a length of 58 bytes (LEN: {0:d})'.format(len(nep2_key))) ADDRESS_HASH_SIZE = 4 ADDRESS_HASH_OFFSET = len(NEP_FLAG) + len(NEP_HEADER) try: decoded_key = base58.b58decode_check(nep2_key) except Exception as e: raise ValueError("Invalid nep2_key") address_hash = decoded_key[ADDRESS_HASH_OFFSET:ADDRESS_HASH_OFFSET + ADDRESS_HASH_SIZE] encrypted = decoded_key[-32:] pwd_normalized = bytes(unicodedata.normalize('NFC', passphrase), 'utf-8') derived = scrypt.hash(pwd_normalized, address_hash, N=SCRYPT_ITERATIONS, r=SCRYPT_BLOCKSIZE, p=SCRYPT_PARALLEL_FACTOR, buflen=SCRYPT_KEY_LEN_BYTES) derived1 = derived[:32] derived2 = derived[32:] cipher = AES.new(derived2, AES.MODE_ECB) decrypted = cipher.decrypt(encrypted) private_key = xor_bytes(decrypted, derived1) # Now check that the address hashes match. If they don't, the password was wrong. kp_new = KeyPair(priv_key=private_key) kp_new_address = kp_new.GetAddress() kp_new_address_hash_tmp = hashlib.sha256(kp_new_address.encode('utf-8')).digest() kp_new_address_hash_tmp2 = hashlib.sha256(kp_new_address_hash_tmp).digest() kp_new_address_hash = kp_new_address_hash_tmp2[:4] if (kp_new_address_hash != address_hash): raise ValueError("Wrong passphrase") return private_key