我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用base64.decodebytes()。
def load_local_session(self): # ????? Session ????????????????? if self.__storage_path__ is not None: # ??????????? Session ???????????????? Session ID session_path_list = os.listdir(self.__storage_path__) # ?????????? for session_id in session_path_list: # ?????????? path = os.path.join(self.__storage_path__, session_id) # ???????? with open(path, 'rb') as f: content = f.read() # ??????? base64 ?? content = base64.decodebytes(content) # ? Session ID ??????????????????? self.__session_map__[session_id] = json.loads(content.decode()) # ??????????
def _open_server(self): self.server = SMTP(params['smtp_server'], params.get("smtp_port", 587)) self.server.ehlo() self.server.starttls() if params.get('use_gmail_oauth2'): auth_email, access_token = _get_oauth_info() auth_string = b'user=' + bytes(params['sender_email_address'], 'ascii') + b'\1auth=Bearer ' + access_token + b'\1\1' log.info(auth_string) code, msg = self.server.docmd('AUTH', 'XOAUTH2 ' + (base64.b64encode(auth_string)).decode('ascii')) log.info("Code {} Message {}", code, base64.decodebytes(msg)) if code == 235: return True code, msg = self.server.docmd('') log.info("code {}, Message {}", code, msg) return False try: self.server.login(params['sender_email_address'], params['sender_password']) return True except SMTPAuthenticationError: log.warn("SMTP Password Authentication Failed", ex_info=True) return False
def construct_yaml_binary(self, node): # type: (Any) -> Any try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError( None, None, "failed to convert base64 data into ascii: %s" % exc, node.start_mark) try: if hasattr(base64, 'decodebytes'): return base64.decodebytes(value) else: return base64.decodestring(value) except binascii.Error as exc: raise ConstructorError( None, None, "failed to decode base64 data: %s" % exc, node.start_mark)
def construct_python_bytes(self, node): # type: (Any) -> Any try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError( None, None, "failed to convert base64 data into ascii: %s" % exc, node.start_mark) try: if hasattr(base64, 'decodebytes'): return base64.decodebytes(value) else: return base64.decodestring(value) except binascii.Error as exc: raise ConstructorError( None, None, "failed to decode base64 data: %s" % exc, node.start_mark)
def load_lre_list(): """ The header include following column: * name: LDC2017E22/data/ara-acm/ar-20031215-034005_0-a.sph * lre: {'train17', 'eval15', 'train15', 'dev17', 'eval17'} * language: {'ara-arb', 'eng-sas', 'fre-hat', 'zho-wuu', 'eng-gbr', 'ara-ary', 'eng-usg', 'spa-lac', 'ara-apc', 'qsl-pol', 'spa-eur', 'fre-waf', 'zho-cdo', 'qsl-rus', 'spa-car', 'ara-arz', 'zho-cmn', 'por-brz', 'zho-yue', 'zho-nan', 'ara-acm'} * corpus: {'pcm', 'alaw', 'babel', 'ulaw', 'vast', 'mls14'} * duration: {'3', '30', '5', '15', '10', '20', '1000', '25'} Note ---- Suggested namming scheme: `lre/lang/corpus/dur/base_name` """ link = b'aHR0cHM6Ly9zMy5hbWF6b25hd3MuY29tL2FpLWRhdGFzZXRzL2xyZV9saXN0LnR4dA==\n' link = str(base64.decodebytes(link), 'utf-8') path = get_file(fname=os.path.basename(link), origin=link, outdir=get_datasetpath(root='~')) return np.genfromtxt(fname=path, dtype=str, delimiter=' ', skip_header=1)
def get_response_sspi(self, challenge=None): dprint("pywin32 SSPI") if challenge: try: challenge = base64.decodebytes(challenge.encode("utf-8")) except: challenge = base64.decodestring(challenge) output_buffer = None try: error_msg, output_buffer = self.sspi_client.authorize(challenge) except pywintypes.error: traceback.print_exc(file=sys.stdout) return None response_msg = output_buffer[0].Buffer try: response_msg = base64.encodebytes(response_msg.encode("utf-8")) except: response_msg = base64.encodestring(response_msg) response_msg = response_msg.decode("utf-8").replace('\012', '') return response_msg
def _is_valid_ssh_rsa_public_key(openssh_pubkey): # http://stackoverflow.com/questions/2494450/ssh-rsa-public-key-validation-using-a-regular-expression # A "good enough" check is to see if the key starts with the correct header. import struct try: from base64 import decodebytes as base64_decode except ImportError: # deprecated and redirected to decodebytes in Python 3 from base64 import decodestring as base64_decode parts = openssh_pubkey.split() if len(parts) < 2: return False key_type = parts[0] key_string = parts[1] data = base64_decode(key_string.encode()) # pylint:disable=deprecated-method int_len = 4 str_len = struct.unpack('>I', data[:int_len])[0] # this should return 7 return data[int_len:int_len + str_len] == key_type.encode()
def is_valid_ssh_rsa_public_key(openssh_pubkey): # http://stackoverflow.com/questions/2494450/ssh-rsa-public-key-validation-using-a-regular-expression # pylint: disable=line-too-long # A "good enough" check is to see if the key starts with the correct header. import struct try: from base64 import decodebytes as base64_decode except ImportError: # deprecated and redirected to decodebytes in Python 3 from base64 import decodestring as base64_decode parts = openssh_pubkey.split() if len(parts) < 2: return False key_type = parts[0] key_string = parts[1] data = base64_decode(key_string.encode()) # pylint:disable=deprecated-method int_len = 4 str_len = struct.unpack('>I', data[:int_len])[0] # this should return 7 return data[int_len:int_len + str_len] == key_type.encode()
def decode_basic_auth(logger, http_auth): """ Decodes basic auth from the header string. :param logger: logging instance :type logger: Logger :param http_auth: Basic authentication header :type http_auth: string :returns: tuple -- (username, passphrase) or (None, None) if empty. :rtype: tuple """ if http_auth is not None: if http_auth.lower().startswith('basic '): try: decoded = tuple(base64.decodebytes( http_auth[6:].encode('utf-8')).decode().split(':')) if logger: logger.debug('Credentials given: %s', decoded) return decoded except base64.binascii.Error: if logger: logger.info( 'Bad base64 data sent. Setting to no user/pass.') # Default meaning no user or password return (None, None)
def save_png(self, filename): """Save the diagram to a PNG file. The widget must be displayed first before the PNG data is available. To display the widget and save an image at the same time, use `auto_save_png`. Parameters ---------- filename : string """ if self.png: data = base64.decodebytes(bytes(self.png, 'ascii')) with open(filename, 'wb') as f: f.write(data) else: warnings.warn('No png image available! Try auto_save_png() instead?')
def get(self, key, default=None, version=None): """ Fetch a given key from the cache. If the key does not exist, return default, which itself defaults to None. """ key = self.make_key(key, version) self.validate_key(key) data = self._coll.find_one({'key': key}) if not data: return default unencoded = base64.decodebytes(data['data']) unpickled = pickle.loads(unencoded) return unpickled
def get_many(self, keys, version=None): """ Fetch a bunch of keys from the cache. For certain backends (memcached, pgsql) this can be *much* faster when fetching multiple values. Returns a dict mapping each key in keys to its value. If the given key is missing, it will be missing from the response dict. """ out = {} parsed_keys = {} for key in keys: pkey = self.make_key(key, version) self.validate_key(pkey) parsed_keys[pkey] = key data = self._coll.find({'key': {'$in': parsed_keys.keys()}}) for result in data: unencoded = base64.decodebytes(result['data']) unpickled = pickle.loads(unencoded) out[parsed_keys[result['key']]] = unpickled return out
def parse_request(self): if not six.moves.BaseHTTPServer.BaseHTTPRequestHandler.parse_request(self): return False if self.DO_AUTH: authorization = self.headers.get('Authorization', '') if not authorization: self.send_autherror(401, b"Authorization Required") return False scheme, credentials = authorization.split() if scheme != 'Basic': self.send_error(501) return False credentials = base64.decodebytes(credentials.encode()).decode() user, password = credentials.split(':') if not self.get_userinfo(user, password, self.command): self.send_autherror(401, b"Authorization Required") return False return True
def _internal_service_call(self, share_data) : try: # internal : IMAGE print("?????????? ??? ?? ?? ?? ?? ?????????? ") temp = {} request_type = share_data.get_request_type() decode_text = base64.decodebytes(str.encode(share_data.get_request_data())) temp['test'] = [InMemoryUploadedFile(io.BytesIO(decode_text), None, 'test.jpg', 'image/jpeg', len(decode_text), None)] ml = MultiValueDict(temp) # fp = open("/hoya_src_root/nn00004/1/test1.jpg", 'wb') # fp.write(decode_text) # fp.close() # CNN Prediction if(request_type == "image"): return_val = PredictNetCnn().run('nn00004', None, ml ) name_tag = {"KYJ" : "???", "KSW" : "???", "LTY" : "???", "LSH" : "???", "PJH" : "???", "KSS" : "???", "PSC" : "???"} print("?????????? ??? ?? ?? ?? ?? : " + return_val['test.jpg']['key'][0]) share_data.set_output_data(name_tag[return_val['test.jpg']['key'][0]] + "?? ??? ????") else : share_data.set_output_data("??? ?? ??? ????") return share_data except Exception as e: raise Exception(e)
def load(self, config): try: value = config.get(self.section, self.token) value = value.strip() if value != "None": try: value = base64.decodebytes(value.encode("utf-8")).decode( 'utf-8') except Exception as exc: logger.warning( "Failed to decode work dir path ({})".format(value), exc_info=exc ) value = FS.safe(value) else: value = None self.value = value return except (configparser.NoOptionError, configparser.NoSectionError): pass self.value = self.default_value_func()
def _retrieve_content(self, compression, encoding, content): """Extract the content of the sent file.""" # Select the appropriate decompressor. if compression is None: decompress = lambda s: s elif compression == 'bzip2': decompress = bz2.decompress else: raise ValueError('Invalid compression: %s' % compression) # Select the appropriate decoder. if encoding == 'base64': decode = base64.decodebytes else: raise ValueError('Invalid encoding: %s' % encoding) return decompress(decode(content.encode("ascii")))
def base64_decode(input, errors='strict'): assert errors == 'strict' return (base64.decodebytes(input), len(input))
def decode(self, input, final=False): assert self.errors == 'strict' return base64.decodebytes(input)
def decrypt(self, source): decrypter = self._cipher.decryptor() source = urllib.parse.unquote(source) source = base64.decodebytes(source.encode("utf-8")) data_bytes = decrypter.update(source) + decrypter.finalize() return data_bytes.rstrip(b"\x00").decode(self._encoding)
def construct_yaml_binary(self, node): try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError(None, None, "failed to convert base64 data into ascii: %s" % exc, node.start_mark) try: if hasattr(base64, 'decodebytes'): return base64.decodebytes(value) else: return base64.decodestring(value) except binascii.Error as exc: raise ConstructorError(None, None, "failed to decode base64 data: %s" % exc, node.start_mark)
def construct_python_bytes(self, node): try: value = self.construct_scalar(node).encode('ascii') except UnicodeEncodeError as exc: raise ConstructorError(None, None, "failed to convert base64 data into ascii: %s" % exc, node.start_mark) try: if hasattr(base64, 'decodebytes'): return base64.decodebytes(value) else: return base64.decodestring(value) except binascii.Error as exc: raise ConstructorError(None, None, "failed to decode base64 data: %s" % exc, node.start_mark)
def test_get_base64(self): bucket = self.bucket blob = bucket.blob("test.pickle") obj = {"one": 1, "two": [2, 3]} blob.upload_from_string(pickle.dumps(obj)) model = self.contents_manager.get( self.path("test.pickle"), format="base64") self.assertEqual(model["type"], "file") self.assertEqual(model["mimetype"], "application/octet-stream") self.assertEqual(model["format"], "base64") content = model["content"] self.assertIsInstance(content, str) bd = base64.decodebytes(content.encode()) self.assertEqual(obj, pickle.loads(bd))
def _save_file(self, path, content, format): """Uploads content of a generic file to GCS. :param: path blob path. :param: content file contents string. :param: format the description of the input format, can be either "text" or "base64". :return: created :class:`google.cloud.storage.Blob`. """ bucket_name, bucket_path = self._parse_path(path) bucket = self._get_bucket(bucket_name, throw=True) if format not in {"text", "base64"}: raise web.HTTPError( 400, u"Must specify format of file contents as \"text\" or " u"\"base64\"", ) try: if format == "text": bcontent = content.encode("utf8") else: b64_bytes = content.encode("ascii") bcontent = base64.decodebytes(b64_bytes) except Exception as e: raise web.HTTPError( 400, u"Encoding error saving %s: %s" % (path, e) ) blob = bucket.blob(bucket_path) blob.upload_from_string(bcontent) return blob
def testNonce(self): """ WebSocket key should be a random 16-byte nonce. """ key = _create_sec_websocket_key() nonce = base64decode(key.encode("utf-8")) self.assertEqual(16, len(nonce))
def load_from_base64(self, b64_msg): '''Deserializes a base64-encoded YAML message''' return self.load_from_yaml(base64.decodebytes(b64_msg))
def base64_decodestring(s): if six.PY2: return base64.decodestring(s) else: if isinstance(s, str): s = s.encode('utf8') return base64.decodebytes(s).decode('utf8')
def open_data(self, url, data=None): """Use "data" URL.""" if not isinstance(url, str): raise URLError('data error: proxy support for data protocol currently not implemented') # ignore POSTed data # # syntax of data URLs: # dataurl := "data:" [ mediatype ] [ ";base64" ] "," data # mediatype := [ type "/" subtype ] *( ";" parameter ) # data := *urlchar # parameter := attribute "=" value try: [type, data] = url.split(',', 1) except ValueError: raise IOError('data error', 'bad data URL') if not type: type = 'text/plain;charset=US-ASCII' semi = type.rfind(';') if semi >= 0 and '=' not in type[semi:]: encoding = type[semi+1:] type = type[:semi] else: encoding = '' msg = [] msg.append('Date: %s'%time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime(time.time()))) msg.append('Content-type: %s' % type) if encoding == 'base64': # XXX is this encoding/decoding ok? data = base64.decodebytes(data.encode('ascii')).decode('latin-1') else: data = unquote(data) msg.append('Content-Length: %d' % len(data)) msg.append('') msg.append(data) msg = '\n'.join(msg) headers = email.message_from_string(msg) f = io.StringIO(msg) #f.fileno = None # needed for addinfourl return addinfourl(f, headers, url)
def decode(self, data): self.data = base64.decodebytes(data)
def getparser(use_datetime=False, use_builtin_types=False): """getparser() -> parser, unmarshaller Create an instance of the fastest available parser, and attach it to an unmarshalling object. Return both objects. """ if FastParser and FastUnmarshaller: if use_builtin_types: mkdatetime = _datetime_type mkbytes = base64.decodebytes elif use_datetime: mkdatetime = _datetime_type mkbytes = _binary else: mkdatetime = _datetime mkbytes = _binary target = FastUnmarshaller(True, False, mkbytes, mkdatetime, Fault) parser = FastParser(target) else: target = Unmarshaller(use_datetime=use_datetime, use_builtin_types=use_builtin_types) if FastParser: parser = FastParser(target) else: parser = ExpatParser(target) return parser, target ## # Convert a Python tuple or a Fault instance to an XML-RPC packet. # # @def dumps(params, **options) # @param params A tuple or Fault instance. # @keyparam methodname If given, create a methodCall request for # this method name. # @keyparam methodresponse If given, create a methodResponse packet. # If used with a tuple, the tuple must be a singleton (that is, # it must contain exactly one element). # @keyparam encoding The packet encoding. # @return A string containing marshalled data.
def from_jsonb64(msg: str): return base64.decodebytes(msg.encode()) # TODO: monkeypatch is ugly but I'm in a hurry...
def set_payout_address(client, wallet): """ Set a new address from the HD wallet for payouts. Note that is set server-side on a per-account basis. Thus, in the case where a single user has different wallets on different machines, all mining proceeds on all machines are sent to this address. Args: client (TwentyOneRestClient): rest client used for communication with the backend api wallet (two1.wallet.Wallet): a user's wallet instance Returns: bytes: extra nonce 1 which is required for computing the coinbase transaction int: the size in bytes of the extra nonce 2 int: reward amount given upon successful solution found """ payout_address = wallet.current_address auth_resp = client.account_payout_address_post(payout_address) user_info = json.loads(auth_resp.text) enonce1_base64 = user_info["enonce1"] enonce1 = base64.decodebytes(enonce1_base64.encode()) enonce2_size = user_info["enonce2_size"] reward = user_info["reward"] return enonce1, enonce2_size, reward
def get_work(client): """ Get work from the pool using the rest client. Args: client (TwentyOneRestClient): rest client used for communication with the backend api Returns: WorkNotification: a Swirl work notification message """ try: response = client.get_work() except exceptions.ServerRequestError as e: profile_url = "{}/{}".format(two1.TWO1_WWW_HOST, client.username) profile_cta = uxstring.UxString.mining_profile_call_to_action.format(profile_url) err_string = None if e.status_code == 403 and e.data.get("error") == "TO201": err_string = uxstring.UxString.Error.suspended_account elif e.status_code == 403 and e.data.get("error") == "TO501": err_string = uxstring.UxString.monthly_mining_limit_reached elif e.status_code == 403 and e.data.get("error") == "TO502": err_string = uxstring.UxString.lifetime_earn_limit_reached elif e.status_code == 403 and e.data.get("error") == "TO503": err_string = uxstring.UxString.no_earn_allocations.format(two1.TWO1_WWW_HOST, client.username) elif e.status_code == 404: if bitcoin_computer.has_mining_chip(): err_string = uxstring.UxString.monthly_mining_limit_reached else: err_string = uxstring.UxString.earn_limit_reached if err_string: raise exceptions.MiningDisabledError("{}\n\n{}".format(err_string, profile_cta)) else: raise e msg_factory = message_factory.SwirlMessageFactory() msg = base64.decodebytes(response.content) work = msg_factory.read_object(msg) return work
def verify(self, message, signature): # assume we're dealing with a base64 encoded signature signature = b64decode(signature.encode()) message = self.ensure_bytes(message) # ensure we have a public key we can use use to verify if not self._public_key: raise Exception("Key object does not have public key defined, and thus cannot be used to verify.") return self._verify(message, signature)
def setUp(self): if not os.path.isdir(LOCALEDIR): os.makedirs(LOCALEDIR) with open(MOFILE, 'wb') as fp: fp.write(base64.decodebytes(GNU_MO_DATA)) with open(UMOFILE, 'wb') as fp: fp.write(base64.decodebytes(UMO_DATA)) with open(MMOFILE, 'wb') as fp: fp.write(base64.decodebytes(MMO_DATA)) self.env = support.EnvironmentVarGuard() self.env['LANGUAGE'] = 'xx' gettext._translations.clear()
def test_encoding(self): payload = self._au.get_payload() self.assertEqual(base64.decodebytes(bytes(payload, 'ascii')), self._audiodata)
def open_data(self, url, data=None): """Use "data" URL.""" if not isinstance(url, str): raise URLError('data error', 'proxy support for data protocol currently not implemented') # ignore POSTed data # # syntax of data URLs: # dataurl := "data:" [ mediatype ] [ ";base64" ] "," data # mediatype := [ type "/" subtype ] *( ";" parameter ) # data := *urlchar # parameter := attribute "=" value try: [type, data] = url.split(',', 1) except ValueError: raise IOError('data error', 'bad data URL') if not type: type = 'text/plain;charset=US-ASCII' semi = type.rfind(';') if semi >= 0 and '=' not in type[semi:]: encoding = type[semi+1:] type = type[:semi] else: encoding = '' msg = [] msg.append('Date: %s'%time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime(time.time()))) msg.append('Content-type: %s' % type) if encoding == 'base64': # XXX is this encoding/decoding ok? data = base64.decodebytes(data.encode('ascii')).decode('latin1') else: data = unquote(data) msg.append('Content-Length: %d' % len(data)) msg.append('') msg.append(data) msg = '\n'.join(msg) headers = email.message_from_string(msg) f = io.StringIO(msg) #f.fileno = None # needed for addinfourl return addinfourl(f, headers, url)