我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.encode()。
def pbkdf2_hex(data, salt, iterations=DEFAULT_PBKDF2_ITERATIONS, keylen=None, hashfunc=None): """Like :func:`pbkdf2_bin`, but returns a hex-encoded string. .. versionadded:: 0.9 :param data: the data to derive. :param salt: the salt for the derivation. :param iterations: the number of iterations. :param keylen: the length of the resulting key. If not provided, the digest size will be used. :param hashfunc: the hash function to use. This can either be the string name of a known hash function, or a function from the hashlib module. Defaults to sha1. """ rv = pbkdf2_bin(data, salt, iterations, keylen, hashfunc) return to_native(codecs.encode(rv, 'hex_codec'))
def gzip(self, text, compression_level=6): try: import gzip if not self.filepath.endswith(".gz"): self.filepath = self.filepath + ".gz" with gzip.open(self.filepath, 'wb', compresslevel=compression_level) as gzip_writer: gzip_writer.write(text) except UnicodeEncodeError as ue: import unicodedata norm_text = unicodedata.normalize('NFKD', text) # NKFD normalization of the unicode data before write import codecs binary_data = codecs.encode(norm_text, "utf_8") with gzip.open(self.filepath, 'wb', compresslevel=compression_level) as gzip_writer: gzip_writer.write(binary_data) except Exception as e: if DEBUG_FLAG: sys.stderr.write("Naked Framework Error: unable to gzip compress the file with the gzip method (Naked.toolshed.file.py).") raise e #------------------------------------------------------------------------------ # [ write method ] # Universal text file writer that writes by system default or utf-8 encoded unicode if throws UnicdeEncodeError # Tests: test_IO.py :: test_file_ascii_readwrite, test_file_ascii_readwrite_missing_file, # test_file_utf8_write_raises_unicodeerror #------------------------------------------------------------------------------
def to_binary(self, message): ''' Given a text message, returns a binary string (still represented as a character string). ''' # All spaces are encoded as null bytes: message = message.replace(self.SILENCE_TOKEN, self.SILENCE_ENCODING) # handle unicode message = codecs.encode(message, 'utf-8') data = [] for c in message: # get the numeric value of the character try: c = ord(c) except TypeError: # already an int (Python 3) pass # convert to binary bin_c = bin(c) # remove the '0b' prefix bin_c = bin_c[2:] # pad with zeros bin_c = bin_c.zfill(8) data.append(bin_c) return ''.join(data)
def check_payment(self): """ Checks if the Lightning payment has been received for this invoice """ if self.status == 'pending_invoice': return False channel = grpc.insecure_channel(settings.LND_RPCHOST) stub = lnrpc.LightningStub(channel) r_hash_base64 = self.r_hash.encode('utf-8') r_hash_bytes = str(codecs.decode(r_hash_base64, 'base64')) invoice_resp = stub.LookupInvoice(ln.PaymentHash(r_hash=r_hash_bytes)) if invoice_resp.settled: # Payment complete self.status = 'complete' self.save() return True else: # Payment not received return False
def pbkdf2_hex(data, salt, iterations=DEFAULT_PBKDF2_ITERATIONS, keylen=None, hashfunc=None): """Like :func:`pbkdf2_bin`, but returns a hex-encoded string. .. versionadded:: 0.9 :param data: the data to derive. :param salt: the salt for the derivation. :param iterations: the number of iterations. :param keylen: the length of the resulting key. If not provided, the digest size will be used. :param hashfunc: the hash function to use. This can either be the string name of a known hash function, or a function from the hashlib module. Defaults to sha256. """ rv = pbkdf2_bin(data, salt, iterations, keylen, hashfunc) return to_native(codecs.encode(rv, 'hex_codec'))
def do_discover(self) -> Message: """Send a handshake to the device, which can be used to the device type and serial. The handshake must also be done regularly to enable communication with the device. :rtype: Message :raises DeviceException: if the device could not be discovered.""" m = Device.discover(self.ip) if m is not None: self._device_id = m.header.value.device_id self._device_ts = m.header.value.ts if self.debug > 1: _LOGGER.debug(m) _LOGGER.debug("Discovered %s with ts: %s, token: %s", self._device_id, self._device_ts, codecs.encode(m.checksum, 'hex')) else: _LOGGER.error("Unable to discover a device at address %s", self.ip) raise DeviceException("Unable to discover the device %s" % self.ip) return m
def format_s3_path(file, owner, dataset_name, path): format_params = dict(file) format_params.update({ 'owner': owner, 'dataset': dataset_name, 'path': path, 'basename': os.path.basename(path), 'dirname': os.path.dirname(path), 'extension': os.path.splitext(path)[1], }) if 'md5' in format_params: try: md5 = base64.b64decode(format_params['md5']) format_params['md5_hex'] = codecs.encode(md5, 'hex').decode('ascii') except Exception: pass try: s3path = config['STORAGE_PATH_PATTERN'].format(**format_params) except KeyError as e: msg = ('STORAGE_PATH_PATTERN contains variable not found in file info: %s' % e) raise ValueError(msg) return s3path
def pbkdf2_hex(data, salt, iterations=DEFAULT_PBKDF2_ITERATIONS, keylen=None, hashfunc=None): """Like :func:`pbkdf2_bin` but returns a hex encoded string. .. versionadded:: 0.9 :param data: the data to derive. :param salt: the salt for the derivation. :param iterations: the number of iterations. :param keylen: the length of the resulting key. If not provided the digest size will be used. :param hashfunc: the hash function to use. This can either be the string name of a known hash function or a function from the hashlib module. Defaults to sha1. """ rv = pbkdf2_bin(data, salt, iterations, keylen, hashfunc) return to_native(codecs.encode(rv, 'hex_codec'))
def seal_aes_ctr_legacy(key_service, secret, digest_method=DEFAULT_DIGEST): """ Encrypts `secret` using the key service. You can decrypt with the companion method `open_aes_ctr_legacy`. """ # generate a a 64 byte key. # Half will be for data encryption, the other half for HMAC key, encoded_key = key_service.generate_key_data(64) ciphertext, hmac = _seal_aes_ctr( secret, key, LEGACY_NONCE, digest_method, ) return { 'key': b64encode(encoded_key).decode('utf-8'), 'contents': b64encode(ciphertext).decode('utf-8'), 'hmac': codecs.encode(hmac, "hex_codec"), 'digest': digest_method, }
def __init__(self, host='localhost', port=4001, user=None, password=None, connect_timeout=None, detect_types=0, max_redirects=UNLIMITED_REDIRECTS): self.messages = [] self.host = host self.port = port self._headers = {} if not (user is None or password is None): self._headers['Authorization'] = 'Basic ' + \ codecs.encode('{}:{}'.format(user, password).encode('utf-8'), 'base64').decode('utf-8').rstrip('\n') self.connect_timeout = connect_timeout self.max_redirects = max_redirects self.detect_types = detect_types self.parse_decltypes = detect_types & PARSE_DECLTYPES self.parse_colnames = detect_types & PARSE_COLNAMES self._ephemeral = None if host == ':memory:': self._ephemeral = _EphemeralRqlited().__enter__() self.host, self.port = self._ephemeral.http self._connection = self._init_connection()
def read_int(self, address, intsize=None): """ Read an interger value from memory Args: - address: address to read (Int) - intsize: force read size (Int) Returns: - mem value (Int) """ if not intsize: intsize = self.intsize() value = self.readmem(address, intsize) if value: value = to_int("0x" + codecs.encode(value[::-1], 'hex')) return value else: return None
def close(self, deposit_txid, deposit_txid_signature): # Load channel channel = self._load_channel(deposit_txid) # Check if a payment has been made to this chanel if channel['payment_tx'] is None: raise Exception("No payment has been made to this channel.") # Verify deposit txid singature public_key = channel['redeem_script'].customer_public_key assert public_key.verify( deposit_txid.encode(), bitcoin.Signature.from_der(deposit_txid_signature) ), "Invalid deposit txid signature." # Return payment txid return str(channel['payment_tx'].hash)
def test_status_close_channel(): """Test ability to get a channel's status and close it.""" channel_server._db = DatabaseSQLite3(':memory:', db_dir='') test_client = _create_client_txs() # Test that channel close fails when no channel exists with pytest.raises(PaymentChannelNotFoundError): channel_server.close('fake', BAD_SIGNATURE) # Open the channel and make a payment deposit_txid = channel_server.open(test_client.deposit_tx, test_client.redeem_script) payment_txid = channel_server.receive_payment(deposit_txid, test_client.payment_tx) channel_server.redeem(payment_txid) # Test that channel close fails without a valid signature with pytest.raises(TransactionVerificationError): closed = channel_server.close(deposit_txid, BAD_SIGNATURE) # Test that channel close succeeds good_signature = codecs.encode(cust_wallet._private_key.sign(deposit_txid).to_der(), 'hex_codec') closed = channel_server.close(deposit_txid, good_signature) assert closed
def hash(self, key): """Compute portable hash for `key`. :param key: key to hash :return: hash value """ mask = 0xFFFFFFFF disk_key, _ = self.put(key) type_disk_key = type(disk_key) if type_disk_key is sqlite3.Binary: return zlib.adler32(disk_key) & mask elif type_disk_key is TextType: return zlib.adler32(disk_key.encode('utf-8')) & mask # pylint: disable=no-member elif type_disk_key in INT_TYPES: return disk_key % mask else: assert type_disk_key is float return zlib.adler32(struct.pack('!d', disk_key)) & mask
def __init__(self, codestr: str, astdict: AstDict) -> None: self._astdict = astdict # Tokenize and create the noop extractor and the position fixer self._tokens: List[Token] = [Token(*i) for i in tokenize.tokenize(BytesIO(codestr.encode('utf-8')).readline)] token_lines = _create_tokenized_lines(codestr, self._tokens) self.noops_sync = NoopExtractor(codestr, token_lines) self.pos_sync = LocationFixer(codestr, token_lines) self.codestr = codestr # This will store a dict of nodes to end positions, it will be filled # on parse() self._node2endpos = None self.visit_Global = self.visit_Nonlocal = self._promote_names
def visit_Bytes(self, node: Node) -> VisitResult: try: s = node["s"].decode() encoding = 'utf8' except UnicodeDecodeError: # try with base64 s = encode(node["s"], 'base64').decode().strip() encoding = 'base64' node.update({"s": s, "encoding": encoding}) return node
def encode(self, input, errors='strict'): return (input.translate(rot13_map), len(input))
def encode(self, input, final=False): return input.translate(rot13_map)
def getregentry(): return codecs.CodecInfo( name='rot-13', encode=Codec().encode, decode=Codec().decode, incrementalencoder=IncrementalEncoder, incrementaldecoder=IncrementalDecoder, streamwriter=StreamWriter, streamreader=StreamReader, _is_text_encoding=False, ) ### Map
def rot13(infile, outfile): outfile.write(codecs.encode(infile.read(), 'rot-13'))
def bytes_to_hex_str(arg_bytes): return codecs.encode(arg_bytes,'hex').decode('ascii')
def hex_str_to_bytes(arg_str): return codecs.decode(arg_str.encode('ascii'),'hex')
def create_42_guid(sensor_id, proc_pid, proc_createtime): full_guid = codecs.encode(struct.pack('>IIQ', sensor_id, proc_pid, proc_createtime), "hex") return '%s-%s-%s-%s-%s' % (full_guid[:8], full_guid[8:12], full_guid[12:16], full_guid[16:20], full_guid[20:])
def get(self, name, year='0'): try: offset = '0' if not control.setting('bookmarks') == 'true': raise Exception() idFile = hashlib.md5() for i in name: idFile.update(str(i)) for i in year: idFile.update(str(i)) idFile = str(idFile.hexdigest()) dbcon = database.connect(control.bookmarksFile) dbcur = dbcon.cursor() dbcur.execute("SELECT * FROM bookmark WHERE idFile = '%s'" % idFile) match = dbcur.fetchone() self.offset = str(match[1]) dbcon.commit() if self.offset == '0': raise Exception() minutes, seconds = divmod(float(self.offset), 60) ; hours, minutes = divmod(minutes, 60) label = '%02d:%02d:%02d' % (hours, minutes, seconds) label = (control.lang(32502) % label).encode('utf-8') try: yes = control.dialog.contextmenu([label, control.lang(32501).encode('utf-8'), ]) except: yes = control.yesnoDialog(label, '', '', str(name), control.lang(32503).encode('utf-8'), control.lang(32501).encode('utf-8')) if yes: self.offset = '0' return self.offset except: return offset
def __init__(self, name, base_encoding, mapping): self.name = name self.base_encoding = base_encoding self.mapping = mapping self.reverse = {v:k for k,v in mapping.items()} self.max_len = max(len(v) for v in mapping.values()) self.info = codecs.CodecInfo(name=self.name, encode=self.encode, decode=self.decode) codecs.register_error(name, self.error)
def encode(self, input, errors='strict'): assert errors == 'strict' #return codecs.encode(input, self.base_encoding, self.name), len(input) # The above line could totally be all we needed, relying on the error # handling to replace the unencodable Unicode characters with our extended # byte sequences. # # However, there seems to be a design bug in Python (probably intentional): # the error handler for encoding is supposed to return a **Unicode** character, # that then needs to be encodable itself... Ugh. # # So we implement what codecs.encode() should have been doing: which is expect # error handler to return bytes() to be added to the output. # # This seems to have been fixed in Python 3.3. We should try using that and # use fallback only if that failed. # https://docs.python.org/3.3/library/codecs.html#codecs.register_error length = len(input) out = b'' while input: try: part = codecs.encode(input, self.base_encoding) out += part input = '' # All converted except UnicodeEncodeError as e: # Convert the correct part out += codecs.encode(input[:e.start], self.base_encoding) replacement, pos = self.error(e) out += replacement input = input[pos:] return out, length
def safe_str_cmp(a, b): """This function compares strings in somewhat constant time. This requires that the length of at least one string is known in advance. Returns `True` if the two strings are equal, or `False` if they are not. .. versionadded:: 0.7 """ if isinstance(a, text_type): a = a.encode('utf-8') if isinstance(b, text_type): b = b.encode('utf-8') if _builtin_safe_str_cmp is not None: return _builtin_safe_str_cmp(a, b) if len(a) != len(b): return False rv = 0 if PY2: for x, y in izip(a, b): rv |= ord(x) ^ ord(y) else: for x, y in izip(a, b): rv |= x ^ y return rv == 0
def _hash_internal(method, salt, password): """Internal password hash helper. Supports plaintext without salt, unsalted and salted passwords. In case salted passwords are used hmac is used. """ if method == 'plain': return password, method if isinstance(password, text_type): password = password.encode('utf-8') if method.startswith('pbkdf2:'): args = method[7:].split(':') if len(args) not in (1, 2): raise ValueError('Invalid number of arguments for PBKDF2') method = args.pop(0) iterations = args and int(args[0] or 0) or DEFAULT_PBKDF2_ITERATIONS is_pbkdf2 = True actual_method = 'pbkdf2:%s:%d' % (method, iterations) else: is_pbkdf2 = False actual_method = method hash_func = _hash_funcs.get(method) if hash_func is None: raise TypeError('invalid method %r' % method) if is_pbkdf2: if not salt: raise ValueError('Salt is required for PBKDF2') rv = pbkdf2_hex(password, salt, iterations, hashfunc=hash_func) elif salt: if isinstance(salt, text_type): salt = salt.encode('utf-8') rv = hmac.HMAC(salt, password, hash_func).hexdigest() else: h = hash_func() h.update(password) rv = h.hexdigest() return rv, actual_method
def explain_code(): code = request.form.get("code", "") error = "" try: hexified = (codecs.encode(bytes([byte]), "hex_codec") for byte in explainer.optimise(bytearray(code, "utf-8"))) except: error = "Error whilst optimising hex" hexified = (codecs.encode(bytes([byte]), "hex_codec") for byte in bytearray(code, "utf-8")) hex_code = b" ".join(hexified).decode("ascii").upper() try: return "\n{}\n{}\n{}".format(explainer.Explainer(bytearray(code, "utf-8"), []), error, hex_code).replace("\n", "\n ") except: return "\n Error formatting explanation\n {}\n {}".format(error, hex_code)
def bytesToHexStr(bytesInput): """Converts byte array to hex str :param bytesInput: byte array to convert :type bytesInput: byte-array :return: Hex string representing bytesInput """ return codecs.encode(bytesInput, 'hex').decode('ascii')
def test_dont_update_when_nothing_changes(self): """ runner = CliRunner() self.template.content = codecs.encode(b"some foo", "base64") result = runner.invoke(update, ["--name", "testrepo", "--token", "token"]) self.assertEqual(result.exit_code, 0) self.gh.assert_called_with("token") self.gh().get_user().get_repo.assert_called_with(name="testrepo") self.gh().get_user().get_repo().get_labels.assert_called_once_with() self.gh().get_user().get_repo().update_file.assert_not_called() """
def test_dont_upgrade_when_nothing_changes(self): runner = CliRunner() self.template.content = codecs.encode(b"some foo", "base64") result = runner.invoke(upgrade, ["--name", "testrepo", "--token", "token"]) self.assertEqual(result.exit_code, 0) self.gh.assert_called_with("token") self.gh().get_user().get_repo.assert_called_with(name="testrepo") self.gh().get_user().get_repo().update_file.assert_not_called()