我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.decode()。
def translate_non_sgml_chars(data, enc='utf-8'): # type: (bytes, str) -> bytes def replace_non_sgml(m): # type: (Match) -> str codepoint = ord(m.group(0)) if 127 <= codepoint <= 159: try: return int2byte(codepoint).decode('windows-1252') except UnicodeDecodeError: pass # Unicode Character 'REPLACEMENT CHARACTER' return u'\ufffd' text = data.decode(enc, 'replace') text = re.sub(unistr(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]'), replace_non_sgml, text) return text.encode(enc, 'replace')
def __call__(self, topic=None): if topic is None: sys.stdout._write('<span class=help>%s</span>' % repr(self)) return import pydoc pydoc.help(topic) rv = sys.stdout.reset() if isinstance(rv, bytes): rv = rv.decode('utf-8', 'ignore') paragraphs = _paragraph_re.split(rv) if len(paragraphs) > 1: title = paragraphs[0] text = '\n\n'.join(paragraphs[1:]) else: # pragma: no cover title = 'Help' text = paragraphs[0] sys.stdout._write(HELP_HTML % {'title': title, 'text': text})
def __init__(self, stream_factory=None, charset='utf-8', errors='replace', max_form_memory_size=None, cls=None, buffer_size=64 * 1024): self.stream_factory = stream_factory self.charset = charset self.errors = errors self.max_form_memory_size = max_form_memory_size if stream_factory is None: stream_factory = default_stream_factory if cls is None: cls = MultiDict self.cls = cls # make sure the buffer size is divisible by four so that we can base64 # decode chunk by chunk assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4' # also the buffer size has to be at least 1024 bytes long or long headers # will freak out the system assert buffer_size >= 1024, 'buffer size has to be at least 1KB' self.buffer_size = buffer_size
def check_payment(self): """ Checks if the Lightning payment has been received for this invoice """ if self.status == 'pending_invoice': return False channel = grpc.insecure_channel(settings.LND_RPCHOST) stub = lnrpc.LightningStub(channel) r_hash_base64 = self.r_hash.encode('utf-8') r_hash_bytes = str(codecs.decode(r_hash_base64, 'base64')) invoice_resp = stub.LookupInvoice(ln.PaymentHash(r_hash=r_hash_bytes)) if invoice_resp.settled: # Payment complete self.status = 'complete' self.save() return True else: # Payment not received return False
def __init__(self, stream_factory=None, charset='utf-8', errors='replace', max_form_memory_size=None, cls=None, buffer_size=64 * 1024): self.charset = charset self.errors = errors self.max_form_memory_size = max_form_memory_size self.stream_factory = default_stream_factory if stream_factory is None else stream_factory self.cls = MultiDict if cls is None else cls # make sure the buffer size is divisible by four so that we can base64 # decode chunk by chunk assert buffer_size % 4 == 0, 'buffer size has to be divisible by 4' # also the buffer size has to be at least 1024 bytes long or long headers # will freak out the system assert buffer_size >= 1024, 'buffer size has to be at least 1KB' self.buffer_size = buffer_size
def _real_extract(self, url): video_id = self._match_id(url) webpage = self._download_webpage(url, video_id) if re.search(r"<(?:DIV|div) class='login-required-screen'>", webpage): self.raise_login_required('This video requires login') title = self._og_search_title(webpage) description = self._og_search_description(webpage) thumbnail = self._og_search_thumbnail(webpage) duration = int_or_none(self._html_search_meta( 'video:duration', webpage, 'duration')) apu = self._search_regex(r"apu='([^']+)'", webpage, 'apu') m3u8_url = codecs.decode(apu, 'rot_13')[::-1] formats = self._extract_m3u8_formats(m3u8_url, video_id, ext='mp4') return { 'id': video_id, 'title': title, 'formats': formats, 'thumbnail': thumbnail, 'description': description, 'duration': duration, }
def seal_aes_ctr_legacy(key_service, secret, digest_method=DEFAULT_DIGEST): """ Encrypts `secret` using the key service. You can decrypt with the companion method `open_aes_ctr_legacy`. """ # generate a a 64 byte key. # Half will be for data encryption, the other half for HMAC key, encoded_key = key_service.generate_key_data(64) ciphertext, hmac = _seal_aes_ctr( secret, key, LEGACY_NONCE, digest_method, ) return { 'key': b64encode(encoded_key).decode('utf-8'), 'contents': b64encode(ciphertext).decode('utf-8'), 'hmac': codecs.encode(hmac, "hex_codec"), 'digest': digest_method, }
def _split_key_block(self, key_block): key_list = [] key_start_index = 0 while key_start_index < len(key_block): temp = key_block[key_start_index:key_start_index + self._number_width] # the corresponding record's offset in record block key_id = unpack(self._number_format, key_block[key_start_index:key_start_index + self._number_width])[0] # key text ends with '\x00' if self._encoding == 'UTF-16': delimiter = b'\x00\x00' width = 2 else: delimiter = b'\x00' width = 1 i = key_start_index + self._number_width while i < len(key_block): if key_block[i:i + width] == delimiter: key_end_index = i break i += width key_text = key_block[key_start_index + self._number_width:key_end_index]\ .decode(self._encoding, errors='ignore').encode('utf-8').strip() key_start_index = key_end_index + width key_list += [(key_id, key_text)] return key_list
def pattern_env(self, *arg): """ Set environment variable with a cyclic pattern Set "pattern" option for basic/extended pattern type Usage: MYNAME ENVNAME size[,offset] """ (env, size) = normalize_argv(arg, 2) if size is None: self._missing_argument() (size, offset) = (arg[1] + ",").split(",")[:2] size = to_int(size) if offset: offset = to_int(offset) else: offset = 0 if size is None or offset is None: self._missing_argument() peda.execute("set env %s %s" % (env, cyclic_pattern(size, offset).decode('utf-8'))) msg("Set environment %s = cyclic_pattern(%d, %d)" % (env, size, offset)) return
def http_basic_auth(func): @wraps(func) def _decorator(request, *args, **kwargs): from django.contrib.auth import authenticate, login if "HTTP_AUTHORIZATION" in request.META: authmeth, auth = request.META["HTTP_AUTHORIZATION"].split(b" ", 1) if authmeth.lower() == b"basic": auth = codecs.decode(auth.strip(), "base64") username, password = auth.split(b":", 1) user = authenticate(username=username, password=password) if user and is_authorized(user): login(request, user) else: raise PermissionDenied() return func(request, *args, **kwargs) return _decorator
def test_resume_archive_from_file(self, mock_resume_file_upload): part_size = 4 mock_list_parts = Mock() mock_list_parts.return_value = { 'PartSizeInBytes': part_size, 'Parts': [{ 'RangeInBytes': '0-3', 'SHA256TreeHash': '12', }, { 'RangeInBytes': '4-6', 'SHA256TreeHash': '34', }], } self.vault.list_all_parts = mock_list_parts self.vault.resume_archive_from_file( sentinel.upload_id, file_obj=sentinel.file_obj) mock_resume_file_upload.assert_called_once_with( self.vault, sentinel.upload_id, part_size, sentinel.file_obj, {0: codecs.decode('12', 'hex_codec'), 1: codecs.decode('34', 'hex_codec')})
def _load_channel(self, deposit_txid): # Check if a payment has been made to this chanel if not os.path.exists(deposit_txid + ".server.tx"): raise PaymentChannelNotFoundError("Payment channel not found.") # Load JSON channel with open(deposit_txid + ".server.tx") as f: channel_json = json.load(f) # Deserialize into objects channel = {} channel['deposit_tx'] = bitcoin.Transaction.from_hex(channel_json['deposit_tx']) channel['redeem_script'] = PaymentChannelRedeemScript.from_bytes( codecs.decode(channel_json['redeem_script'], 'hex_codec')) channel['payment_tx'] = bitcoin.Transaction.from_hex( channel_json['payment_tx']) if channel_json['payment_tx'] else None return channel
def open(self, deposit_tx, redeem_script): # Deserialize deposit tx and redeem script deposit_tx = bitcoin.Transaction.from_hex(deposit_tx) try: redeem_script = PaymentChannelRedeemScript.from_bytes(codecs.decode(redeem_script, 'hex_codec')) except ValueError: raise AssertionError("Invalid payment channel redeem script.") # Validate deposit tx assert len(deposit_tx.outputs) > 1, "Invalid deposit tx outputs." output_index = deposit_tx.output_index_for_address(redeem_script.hash160()) assert output_index is not None, "Missing deposit tx P2SH output." assert deposit_tx.outputs[output_index].script.is_p2sh(), "Invalid deposit tx output P2SH script." assert deposit_tx.outputs[output_index].script.get_hash160() == redeem_script.hash160(), "Invalid deposit tx output script P2SH address." # nopep8 # Store channel deposit_txid = str(deposit_tx.hash) self._store_channel( deposit_txid, {'deposit_tx': deposit_tx, 'redeem_script': redeem_script, 'payment_tx': None})
def open(self, deposit_tx, redeem_script): # Deserialize deposit tx and redeem script deposit_tx = bitcoin.Transaction.from_hex(deposit_tx) deposit_txid = str(deposit_tx.hash) redeem_script = statemachine.PaymentChannelRedeemScript.from_bytes(codecs.decode(redeem_script, 'hex_codec')) # Validate redeem_script assert redeem_script.merchant_public_key.compressed_bytes == self.PRIVATE_KEY.public_key.compressed_bytes # Validate deposit tx assert len(deposit_tx.outputs) == 1, "Invalid deposit tx outputs." output_index = deposit_tx.output_index_for_address(redeem_script.hash160()) assert output_index is not None, "Missing deposit tx P2SH output." assert deposit_tx.outputs[output_index].script.is_p2sh(), "Invalid deposit tx output P2SH script." assert deposit_tx.outputs[output_index].script.get_hash160() == redeem_script.hash160(), "Invalid deposit tx output script P2SH address." # nopep8 self.channels[deposit_txid] = {'deposit_tx': deposit_tx, 'redeem_script': redeem_script, 'payment_tx': None}
def bytes_to_hex_str(arg_bytes): return codecs.encode(arg_bytes,'hex').decode('ascii')
def hex_str_to_bytes(arg_str): return codecs.decode(arg_str.encode('ascii'),'hex')
def parse_42_guid(guid): guid_parts = guid.split('-') guid_int = codecs.decode("".join(guid_parts)[:32], "hex") return struct.unpack('>IIQ', guid_int)
def unistr(text, errors='strict'): # type: (Union[str, bytes], str) -> str if isinstance(text, text_type): return text try: return text.decode('utf-8', errors=errors) except UnicodeDecodeError: if HAS_FSCODEC: return os.fsdecode(text) raise
def unifilename(filename): # type: (Union[str, bytes]) -> str if isinstance(filename, text_type): return filename try: return filename.decode(sys.getfilesystemencoding()) except UnicodeDecodeError: try: return filename.decode('utf-8') except UnicodeDecodeError: if HAS_FSCODEC: return os.fsdecode(filename) else: raise
def use_startstyle(self, inlinestyletext): # type: (Optional[str]) -> None if inlinestyletext: # Transform YAML-like JSON # e.g. '{based_on_style: pep8, column_limit: 79}' # into '{"based_on_style": "pep8", "column_limit": 79}' # which can be parsed as JSON. inlinestyletext = re.sub(r'([a-zA-Z_]\w+)', r'"\1"', inlinestyletext) d = json.JSONDecoder().decode(inlinestyletext) # type: ignore self.initial_style = style_make(d)
def surrdecode(s, enc='utf-8'): # type: (bytes, str) -> str return s.decode(enc, 'replace')